diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:11:14 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:11:14 +0000 |
| commit | 0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f (patch) | |
| tree | cf1b47683fcbe33d495872d72627f3bccc40cc91 /src | |
| parent | 04acfa679fd846ac829ded5562093b3766c85154 (diff) | |
| download | webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.tar.bz2 webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.tar.xz webstat-0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f.zip | |
Run jobs on a background thread
Diffstat (limited to 'src')
| -rw-r--r-- | src/ingestor.cpp | 39 | ||||
| -rw-r--r-- | src/ingestor.hpp | 18 |
2 files changed, 35 insertions, 22 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index da39c59..988fd4d 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -75,8 +75,8 @@ namespace WebStat { } Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings settings) : - settings {std::move(settings)}, dbpool {std::move(dbpl)}, hostnameId {crc32(host.nodename)}, - curl {curl_multi_init()} + settings {std::move(settings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, + purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {crc32(host.nodename)}, curl {curl_multi_init()} { auto dbconn = dbpool->get(); auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); @@ -220,24 +220,25 @@ namespace WebStat { void Ingestor::runJobsIdle() { - const auto now = JobLastRunTime::clock::now(); - auto runJobAsNeeded = [this, now](auto job, JobLastRunTime & lastRun, auto freq) { - try { - if (lastRun + freq < now) { - (this->*job)(); - lastRun = now; - } - } - catch (const std::exception &) { - // Error, retry in half the frequency - lastRun = now - (freq / 2); + auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { + if (!job.currentRun && job.lastRun + freq < now) { + job.currentRun.emplace([this, now, freq, &job]() { + try { + (this->*job.impl)(); + job.lastRun = now; + } + catch (const std::exception &) { + // Error, retry in half the frequency + job.lastRun = now - (freq / 2); + } + }); } }; - runJobAsNeeded(&Ingestor::jobIngestParkedLines, lastRunIngestParkedLines, settings.freqIngestParkedLines); - runJobAsNeeded(&Ingestor::jobPurgeOldLogs, lastRunPurgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); + runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } - void + unsigned int Ingestor::jobIngestParkedLines() { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; @@ -246,6 +247,8 @@ namespace WebStat { jobIngestParkedLine(pathIter); } } + // TODO return actual count + return 0; } void @@ -278,12 +281,12 @@ namespace WebStat { Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); - const auto stopAt = JobLastRunTime::clock::now() + settings.purgeDeleteMaxTime; + const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, settings.purgeDeleteMax); purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep)); unsigned int purgedTotal {}; - while (stopAt > JobLastRunTime::clock::now()) { + while (stopAt > Job::LastRunTime::clock::now()) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 67a7a15..72417d9 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -54,7 +54,7 @@ namespace WebStat { void parkLogLine(std::string_view); void runJobsIdle(); - void jobIngestParkedLines(); + unsigned int jobIngestParkedLines(); unsigned int jobPurgeOldLogs(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -70,9 +70,19 @@ namespace WebStat { size_t linesParked = 0; mutable std::flat_set<Crc32Value> existingEntities; - using JobLastRunTime = std::chrono::system_clock::time_point; - JobLastRunTime lastRunIngestParkedLines; - JobLastRunTime lastRunPurgeOldLogs; + struct Job { + using LastRunTime = std::chrono::system_clock::time_point; + using Impl = unsigned int (Ingestor::*)(); + + explicit Job(Impl impl) : impl(impl) { } + + const Impl impl; + LastRunTime lastRun {LastRunTime::clock::now()}; + std::optional<std::thread> currentRun {std::nullopt}; + }; + + Job ingestParkedLines; + Job purgeOldLogs; private: static constexpr size_t MAX_NEW_ENTITIES = 6; |
