summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp40
1 files changed, 30 insertions, 10 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 04216f8..73ee239 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -242,6 +242,7 @@ namespace WebStat {
handleCurlOperations();
}
}
+ finishAllJobs();
tryIngestQueuedLogLines();
std::ignore = parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
@@ -364,21 +365,27 @@ namespace WebStat {
}
void
+ Ingestor::finalizeJob(Job & job, const std::chrono::minutes freq, const Job::LastRunTime::clock::time_point now)
+ {
+ try {
+ job.currentRun->get();
+ job.lastRun = now;
+ }
+ catch (const std::exception & excp) {
+ log(LOG_ERR, "Job run failed: %s", excp.what());
+ // Error, retry in half the frequency
+ job.lastRun = now - (freq / 2);
+ }
+ job.currentRun.reset();
+ }
+
+ void
Ingestor::runJobsAsNeeded()
{
auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) {
if (job.currentRun) {
if (job.currentRun->valid()) {
- try {
- job.currentRun->get();
- job.lastRun = now;
- }
- catch (const std::exception & excp) {
- log(LOG_ERR, "Job run failed: %s", excp.what());
- // Error, retry in half the frequency
- job.lastRun = now - (freq / 2);
- }
- job.currentRun.reset();
+ finalizeJob(job, freq, now);
}
}
else if (expired(job.lastRun, freq, now)) {
@@ -389,6 +396,19 @@ namespace WebStat {
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
}
+ void
+ Ingestor::finishAllJobs()
+ {
+ auto finishJob = [this, now = Job::LastRunTime::clock::now()](Job & job) {
+ if (job.currentRun) {
+ job.currentRun->wait();
+ finalizeJob(job, {}, now);
+ }
+ };
+ finishJob(ingestParkedLines);
+ finishJob(purgeOldLogs);
+ }
+
unsigned int
Ingestor::jobIngestParkedLines()
{