diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-02 18:58:21 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-02 18:58:21 +0100 |
| commit | 4675ab65ea5e807e0d457845a0ca84edcf1262c9 (patch) | |
| tree | c8e0adae9cfefe6643f03b797283b3012e07b2d3 | |
| parent | 5c0206f48dc7f90009629d0a74bdc1dd6b4f67ea (diff) | |
| download | webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.bz2 webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.xz webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.zip | |
Ingest log lines in a background thread
This prevents halting reading input during data insertion.
| -rw-r--r-- | src/ingestor.cpp | 60 | ||||
| -rw-r--r-- | src/ingestor.hpp | 36 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 13 |
3 files changed, 69 insertions, 40 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2c315c..4af2f2d 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -117,6 +117,7 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} @@ -225,7 +226,7 @@ namespace WebStat { stats.linesRead++; queuedLines.emplace_back(std::move(line->value())); if (queuedLines.size() >= settings.maxBatchSize) { - tryIngestQueuedLogLines(); + beginIngestQueuedLogLines(); } } else { @@ -233,7 +234,7 @@ namespace WebStat { } } else { - tryIngestQueuedLogLines(); + beginIngestQueuedLogLines(); } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); @@ -243,21 +244,37 @@ namespace WebStat { } } finishAllJobs(); - tryIngestQueuedLogLines(); - std::ignore = parkQueuedLogLines(); + std::invoke(storeQueueLines.impl, this)(); + std::ignore = parkLogLines(queuedLines); + std::ignore = parkLogLines(processingLines); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } logStats(); } - void - Ingestor::tryIngestQueuedLogLines() + std::pair<std::future<Ingestor::Job::Result> &, bool> + Ingestor::beginIngestQueuedLogLines() + { + if (storeQueueLines.currentRun) { + if (!storeQueueLines.currentRun->valid()) { + return {*storeQueueLines.currentRun, false}; + } + finalizeJob(storeQueueLines, {}, Job::LastRunTime::clock::now()); + } + if (processingLines.empty()) { + std::swap(queuedLines, processingLines); + } + return {storeQueueLines.currentRun.emplace(std::async(storeQueueLines.impl, this)), true}; + } + + Ingestor::Job::Result + Ingestor::jobStoreQueuedLines() { - auto storedEnd = queuedLines.begin(); + auto storedEnd = processingLines.begin(); try { - for (auto batch : - queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) { + for (auto batch : processingLines | std::views::chunk(settings.maxBatchSize) + | std::views::take(settings.maxBatches)) { ingestLogLines(dbpool->get().get(), batch); storedEnd = batch.end(); } @@ -266,7 +283,11 @@ namespace WebStat { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); existingEntities.clear(); } - queuedLines.erase(queuedLines.begin(), storedEnd); + auto count = std::distance(processingLines.begin(), storedEnd); + processingLines.erase(processingLines.begin(), storedEnd); + return [count]() { + return count; + }; } template<typename... T> @@ -335,20 +356,20 @@ namespace WebStat { } std::expected<std::filesystem::path, int> - Ingestor::parkQueuedLogLines() + Ingestor::parkLogLines(LineBatch & lines) { - if (queuedLines.empty()) { + if (lines.empty()) { return std::unexpected(0); } - const std::filesystem::path path {settings.fallbackDir - / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))}; + const std::filesystem::path path { + settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))}; if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { - fprintf(parked.get(), "%zu\n", queuedLines.size()); - for (const auto & line : queuedLines) { + fprintf(parked.get(), "%zu\n", lines.size()); + for (const auto & line : lines) { fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); } if (fflush(parked.get()) == 0) { - queuedLines.clear(); + lines.clear(); auto finalPath = std::filesystem::path {path}.replace_extension(".log"); parked.reset(); if (rename(path.c_str(), finalPath.c_str()) == 0) { @@ -357,8 +378,8 @@ namespace WebStat { } } const int err = errno; - log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size()); - for (const auto & line : queuedLines) { + log(LOG_ERR, "Failed to park %zu queued lines:", lines.size()); + for (const auto & line : lines) { log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data()); } return std::unexpected(err); @@ -407,6 +428,7 @@ namespace WebStat { }; finishJob(ingestParkedLines); finishJob(purgeOldLogs); + finishJob(storeQueueLines); } Ingestor::Job::Result diff --git a/src/ingestor.hpp b/src/ingestor.hpp index d989ecd..64b3357 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -41,6 +41,19 @@ namespace WebStat { public: using LineBatch = std::vector<std::string>; using LinesView = std::span<const std::string>; + + struct Job { + using LastRunTime = std::chrono::system_clock::time_point; + using Result = std::function<unsigned int()>; + using Impl = Result (Ingestor::*)(); + + explicit Job(Impl jobImpl) : impl(jobImpl) { } + + const Impl impl; + LastRunTime lastRun {LastRunTime::clock::now()}; + std::optional<std::future<Result>> currentRun; + }; + Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -55,25 +68,14 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void tryIngestQueuedLogLines(); + std::pair<std::future<Job::Result> &, bool> beginIngestQueuedLogLines(); void ingestLogLines(DB::Connection *, LinesView lines); - std::expected<std::filesystem::path, int> parkQueuedLogLines(); + std::expected<std::filesystem::path, int> parkLogLines(LineBatch &); void runJobsAsNeeded(); - struct Job { - using LastRunTime = std::chrono::system_clock::time_point; - using Result = std::function<unsigned int()>; - using Impl = Result (Ingestor::*)(); - - explicit Job(Impl jobImpl) : impl(jobImpl) { } - - const Impl impl; - LastRunTime lastRun {LastRunTime::clock::now()}; - std::optional<std::future<Result>> currentRun; - }; - Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); + Job::Result jobStoreQueuedLines(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -89,18 +91,21 @@ namespace WebStat { }; protected: + void finishAllJobs(); + static Ingestor * currentIngestor; DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; std::flat_map<EntityHash, EntityId> existingEntities; - LineBatch queuedLines; + LineBatch queuedLines, processingLines; bool terminated = false; Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()}; Job ingestParkedLines; Job purgeOldLogs; + Job storeQueueLines; private: template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); @@ -112,7 +117,6 @@ namespace WebStat { void logStats() const; void clearStats(); void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now); - void finishAllJobs(); LineBatch jobReadParkedLines(const std::filesystem::path &); LineBatch jobReadParkedLines(FILE *, size_t count); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 6ed50ab..60338a2 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -297,7 +297,7 @@ BOOST_AUTO_TEST_CASE(ParkLogLine) { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - const auto path = parkQueuedLogLines(); + const auto path = parkLogLines(queuedLines); BOOST_REQUIRE(path); BOOST_TEST_INFO(*path); BOOST_REQUIRE(std::filesystem::exists(*path)); @@ -317,7 +317,7 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - BOOST_REQUIRE(parkQueuedLogLines()); + BOOST_REQUIRE(parkLogLines(queuedLines)); BOOST_CHECK(!std::filesystem::is_empty(settings.fallbackDir)); BOOST_REQUIRE(queuedLines.empty()); const auto result = jobReadParkedLines(); @@ -340,7 +340,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; queuedLines.emplace_back(LOGLINE1); - const auto path = parkQueuedLogLines(); + const auto path = parkLogLines(queuedLines); BOOST_REQUIRE(path); BOOST_REQUIRE(queuedLines.empty()); BOOST_REQUIRE(std::filesystem::exists(*path)); @@ -374,7 +374,7 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; queuedLines.emplace_back(LOGLINE1); - const auto path = parkQueuedLogLines(); + const auto path = parkLogLines(queuedLines); BOOST_REQUIRE(path); std::filesystem::permissions(*path, std::filesystem::perms::owner_write); runJobsAsNeeded(); @@ -405,7 +405,10 @@ BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail) BOOST_AUTO_TEST_CASE(DiscardUnparsable) { queuedLines.emplace_back("does not parse"); - BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); + auto job = beginIngestQueuedLogLines(); + BOOST_CHECK_EQUAL(&job.first, &*storeQueueLines.currentRun); + BOOST_CHECK(job.second); + finishAllJobs(); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); constexpr std::array<std::tuple<EntityId, std::string_view>, 1> EXPECTED {{ |
