diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 39 |
1 files changed, 21 insertions, 18 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index da39c59..988fd4d 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -75,8 +75,8 @@ namespace WebStat { } Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings settings) : - settings {std::move(settings)}, dbpool {std::move(dbpl)}, hostnameId {crc32(host.nodename)}, - curl {curl_multi_init()} + settings {std::move(settings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, + purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {crc32(host.nodename)}, curl {curl_multi_init()} { auto dbconn = dbpool->get(); auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); @@ -220,24 +220,25 @@ namespace WebStat { void Ingestor::runJobsIdle() { - const auto now = JobLastRunTime::clock::now(); - auto runJobAsNeeded = [this, now](auto job, JobLastRunTime & lastRun, auto freq) { - try { - if (lastRun + freq < now) { - (this->*job)(); - lastRun = now; - } - } - catch (const std::exception &) { - // Error, retry in half the frequency - lastRun = now - (freq / 2); + auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { + if (!job.currentRun && job.lastRun + freq < now) { + job.currentRun.emplace([this, now, freq, &job]() { + try { + (this->*job.impl)(); + job.lastRun = now; + } + catch (const std::exception &) { + // Error, retry in half the frequency + job.lastRun = now - (freq / 2); + } + }); } }; - runJobAsNeeded(&Ingestor::jobIngestParkedLines, lastRunIngestParkedLines, settings.freqIngestParkedLines); - runJobAsNeeded(&Ingestor::jobPurgeOldLogs, lastRunPurgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); + runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } - void + unsigned int Ingestor::jobIngestParkedLines() { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; @@ -246,6 +247,8 @@ namespace WebStat { jobIngestParkedLine(pathIter); } } + // TODO return actual count + return 0; } void @@ -278,12 +281,12 @@ namespace WebStat { Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); - const auto stopAt = JobLastRunTime::clock::now() + settings.purgeDeleteMaxTime; + const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, settings.purgeDeleteMax); purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep)); unsigned int purgedTotal {}; - while (stopAt > JobLastRunTime::clock::now()) { + while (stopAt > Job::LastRunTime::clock::now()) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { |
