diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-01 11:12:39 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-01 11:12:39 +0100 |
| commit | 917487a6de80bf5e81eebcdbbf48bcf9763262fa (patch) | |
| tree | 55ea9a6ba7c8992968d6b24976d307ee56fb5591 /src/ingestor.cpp | |
| parent | 1c929e46d8e7b14a74f80a21b5f30c9abbee410f (diff) | |
| download | webstat-917487a6de80bf5e81eebcdbbf48bcf9763262fa.tar.bz2 webstat-917487a6de80bf5e81eebcdbbf48bcf9763262fa.tar.xz webstat-917487a6de80bf5e81eebcdbbf48bcf9763262fa.zip | |
Explicitly wait for and finalize any running jobs on exit
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 40 |
1 files changed, 30 insertions, 10 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 04216f8..73ee239 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -242,6 +242,7 @@ namespace WebStat { handleCurlOperations(); } } + finishAllJobs(); tryIngestQueuedLogLines(); std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { @@ -364,21 +365,27 @@ namespace WebStat { } void + Ingestor::finalizeJob(Job & job, const std::chrono::minutes freq, const Job::LastRunTime::clock::time_point now) + { + try { + job.currentRun->get(); + job.lastRun = now; + } + catch (const std::exception & excp) { + log(LOG_ERR, "Job run failed: %s", excp.what()); + // Error, retry in half the frequency + job.lastRun = now - (freq / 2); + } + job.currentRun.reset(); + } + + void Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { if (job.currentRun) { if (job.currentRun->valid()) { - try { - job.currentRun->get(); - job.lastRun = now; - } - catch (const std::exception & excp) { - log(LOG_ERR, "Job run failed: %s", excp.what()); - // Error, retry in half the frequency - job.lastRun = now - (freq / 2); - } - job.currentRun.reset(); + finalizeJob(job, freq, now); } } else if (expired(job.lastRun, freq, now)) { @@ -389,6 +396,19 @@ namespace WebStat { runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } + void + Ingestor::finishAllJobs() + { + auto finishJob = [this, now = Job::LastRunTime::clock::now()](Job & job) { + if (job.currentRun) { + job.currentRun->wait(); + finalizeJob(job, {}, now); + } + }; + finishJob(ingestParkedLines); + finishJob(purgeOldLogs); + } + unsigned int Ingestor::jobIngestParkedLines() { |
