diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:11:14 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:11:14 +0000 |
| commit | 0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f (patch) | |
| tree | cf1b47683fcbe33d495872d72627f3bccc40cc91 | |
| parent | 04acfa679fd846ac829ded5562093b3766c85154 (diff) | |
| download | webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.tar.bz2 webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.tar.xz webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.zip | |
Run jobs on a background thread
| -rw-r--r-- | src/ingestor.cpp | 39 | ||||
| -rw-r--r-- | src/ingestor.hpp | 18 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 38 |
3 files changed, 61 insertions, 34 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index da39c59..988fd4d 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -75,8 +75,8 @@ namespace WebStat { } Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings settings) : - settings {std::move(settings)}, dbpool {std::move(dbpl)}, hostnameId {crc32(host.nodename)}, - curl {curl_multi_init()} + settings {std::move(settings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, + purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {crc32(host.nodename)}, curl {curl_multi_init()} { auto dbconn = dbpool->get(); auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); @@ -220,24 +220,25 @@ namespace WebStat { void Ingestor::runJobsIdle() { - const auto now = JobLastRunTime::clock::now(); - auto runJobAsNeeded = [this, now](auto job, JobLastRunTime & lastRun, auto freq) { - try { - if (lastRun + freq < now) { - (this->*job)(); - lastRun = now; - } - } - catch (const std::exception &) { - // Error, retry in half the frequency - lastRun = now - (freq / 2); + auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { + if (!job.currentRun && job.lastRun + freq < now) { + job.currentRun.emplace([this, now, freq, &job]() { + try { + (this->*job.impl)(); + job.lastRun = now; + } + catch (const std::exception &) { + // Error, retry in half the frequency + job.lastRun = now - (freq / 2); + } + }); } }; - runJobAsNeeded(&Ingestor::jobIngestParkedLines, lastRunIngestParkedLines, settings.freqIngestParkedLines); - runJobAsNeeded(&Ingestor::jobPurgeOldLogs, lastRunPurgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); + runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } - void + unsigned int Ingestor::jobIngestParkedLines() { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; @@ -246,6 +247,8 @@ namespace WebStat { jobIngestParkedLine(pathIter); } } + // TODO return actual count + return 0; } void @@ -278,12 +281,12 @@ namespace WebStat { Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); - const auto stopAt = JobLastRunTime::clock::now() + settings.purgeDeleteMaxTime; + const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, settings.purgeDeleteMax); purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep)); unsigned int purgedTotal {}; - while (stopAt > JobLastRunTime::clock::now()) { + while (stopAt > Job::LastRunTime::clock::now()) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 67a7a15..72417d9 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -54,7 +54,7 @@ namespace WebStat { void parkLogLine(std::string_view); void runJobsIdle(); - void jobIngestParkedLines(); + unsigned int jobIngestParkedLines(); unsigned int jobPurgeOldLogs(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -70,9 +70,19 @@ namespace WebStat { size_t linesParked = 0; mutable std::flat_set<Crc32Value> existingEntities; - using JobLastRunTime = std::chrono::system_clock::time_point; - JobLastRunTime lastRunIngestParkedLines; - JobLastRunTime lastRunPurgeOldLogs; + struct Job { + using LastRunTime = std::chrono::system_clock::time_point; + using Impl = unsigned int (Ingestor::*)(); + + explicit Job(Impl impl) : impl(impl) { } + + const Impl impl; + LastRunTime lastRun {LastRunTime::clock::now()}; + std::optional<std::thread> currentRun {std::nullopt}; + }; + + Job ingestParkedLines; + Job purgeOldLogs; private: static constexpr size_t MAX_NEW_ENTITIES = 6; diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index d523aab..3effae4 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -282,40 +282,54 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } -BOOST_AUTO_TEST_CASE(IngestParkedJob, *boost::unit_test::depends_on("I/IngestParked")) +BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs) { - const auto now = JobLastRunTime::clock::now(); - lastRunIngestParkedLines = now - 1s; + runJobsIdle(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); + BOOST_REQUIRE(!purgeOldLogs.currentRun); +} + +BOOST_AUTO_TEST_CASE(IngestParkedJob, + *boost::unit_test::depends_on("I/IngestParked") * boost::unit_test::depends_on("I/DefaultLaunchNoJobs")) +{ + const auto now = Job::LastRunTime::clock::now(); + ingestParkedLines.lastRun = now - 1s; parkLogLine(LOGLINE1); runJobsIdle(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_REQUIRE_EQUAL(linesParked, 1); BOOST_REQUIRE_EQUAL(linesParsed, 0); - BOOST_CHECK_EQUAL(lastRunIngestParkedLines, now - 1s); + BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); - lastRunIngestParkedLines = now - settings.freqIngestParkedLines + 2s; + ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_REQUIRE_EQUAL(linesParked, 1); BOOST_REQUIRE_EQUAL(linesParsed, 0); - BOOST_CHECK_EQUAL(lastRunIngestParkedLines, now - settings.freqIngestParkedLines + 2s); + BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); - lastRunIngestParkedLines = now - settings.freqIngestParkedLines - 1s; + ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; runJobsIdle(); + BOOST_REQUIRE(ingestParkedLines.currentRun); + ingestParkedLines.currentRun->join(); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); - BOOST_CHECK_GE(lastRunIngestParkedLines, now); + BOOST_CHECK_GE(ingestParkedLines.lastRun, now); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/IngestParkedJob")) { - const auto now = JobLastRunTime::clock::now(); - lastRunIngestParkedLines = now - settings.freqIngestParkedLines - 1s; + const auto now = Job::LastRunTime::clock::now(); + ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; parkLogLine(LOGLINE1); std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsIdle(); + BOOST_REQUIRE(ingestParkedLines.currentRun); + ingestParkedLines.currentRun->join(); BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); - BOOST_CHECK_GE(lastRunIngestParkedLines, now - (settings.freqIngestParkedLines / 2) - 1s); - BOOST_CHECK_LE(lastRunIngestParkedLines, now - (settings.freqIngestParkedLines / 2) + 1s); + BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s); + BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s); } BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail) |
