diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:48:34 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-03-17 11:48:34 +0000 |
| commit | d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a (patch) | |
| tree | 90c34529461f86f2b6f59ba44f9d910214ed8630 /src/ingestor.cpp | |
| parent | 0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f (diff) | |
| download | webstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.tar.bz2 webstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.tar.xz webstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.zip | |
Execute jobs even when processing incoming logs
Jobs run on background threads now, so we can happily run them even when
we're busy.
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 988fd4d..950641b 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -135,8 +135,9 @@ namespace WebStat { { curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0}; - for (int interesting = 0; - curl_multi_poll(curl.get(), &logIn, 1, settings.idleJobsAfter, &interesting) == CURLM_OK;) { + const auto curlTimeOut = static_cast<int>( + std::chrono::duration_cast<std::chrono::milliseconds>(settings.checkJobsAfter).count()); + while (curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; @@ -146,8 +147,8 @@ namespace WebStat { break; } } - else if (!interesting) { - runJobsIdle(); + if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { + runJobsAsNeeded(); } if (!curlOperations.empty()) { handleCurlOperations(); @@ -218,10 +219,10 @@ namespace WebStat { } void - Ingestor::runJobsIdle() + Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { - if (!job.currentRun && job.lastRun + freq < now) { + if (!job.currentRun && expired(job.lastRun, freq, now)) { job.currentRun.emplace([this, now, freq, &job]() { try { (this->*job.impl)(); |
