summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp39
1 files changed, 21 insertions, 18 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index da39c59..988fd4d 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -75,8 +75,8 @@ namespace WebStat {
}
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings settings) :
- settings {std::move(settings)}, dbpool {std::move(dbpl)}, hostnameId {crc32(host.nodename)},
- curl {curl_multi_init()}
+ settings {std::move(settings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines},
+ purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}
{
auto dbconn = dbpool->get();
auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
@@ -220,24 +220,25 @@ namespace WebStat {
void
Ingestor::runJobsIdle()
{
- const auto now = JobLastRunTime::clock::now();
- auto runJobAsNeeded = [this, now](auto job, JobLastRunTime & lastRun, auto freq) {
- try {
- if (lastRun + freq < now) {
- (this->*job)();
- lastRun = now;
- }
- }
- catch (const std::exception &) {
- // Error, retry in half the frequency
- lastRun = now - (freq / 2);
+ auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) {
+ if (!job.currentRun && job.lastRun + freq < now) {
+ job.currentRun.emplace([this, now, freq, &job]() {
+ try {
+ (this->*job.impl)();
+ job.lastRun = now;
+ }
+ catch (const std::exception &) {
+ // Error, retry in half the frequency
+ job.lastRun = now - (freq / 2);
+ }
+ });
}
};
- runJobAsNeeded(&Ingestor::jobIngestParkedLines, lastRunIngestParkedLines, settings.freqIngestParkedLines);
- runJobAsNeeded(&Ingestor::jobPurgeOldLogs, lastRunPurgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
+ runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
}
- void
+ unsigned int
Ingestor::jobIngestParkedLines()
{
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
@@ -246,6 +247,8 @@ namespace WebStat {
jobIngestParkedLine(pathIter);
}
}
+ // TODO return actual count
+ return 0;
}
void
@@ -278,12 +281,12 @@ namespace WebStat {
Ingestor::jobPurgeOldLogs()
{
auto dbconn = dbpool->get();
- const auto stopAt = JobLastRunTime::clock::now() + settings.purgeDeleteMaxTime;
+ const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime;
const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS);
purge->bindParam(0, settings.purgeDeleteMax);
purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep));
unsigned int purgedTotal {};
- while (stopAt > JobLastRunTime::clock::now()) {
+ while (stopAt > Job::LastRunTime::clock::now()) {
const auto purged = purge->execute();
purgedTotal += purged;
if (purged < settings.purgeDeleteMax) {