summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
authorDan Goodliffe <dan.goodliffe@octal.co.uk>2026-03-17 11:48:34 +0000
committerDan Goodliffe <dan.goodliffe@octal.co.uk>2026-03-17 11:48:34 +0000
commitd2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a (patch)
tree90c34529461f86f2b6f59ba44f9d910214ed8630 /src/ingestor.cpp
parent0e3c2e8dd462a7d56f8b61e15c8cf10681898a1f (diff)
downloadwebstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.tar.bz2
webstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.tar.xz
webstat-d2ecb7f49c3f3b60e3d1f297fd033071a02cfa9a.zip
Execute jobs even when processing incoming logs
Jobs run on background threads now, so we can happily run them even when we're busy.
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp13
1 files changed, 7 insertions, 6 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 988fd4d..950641b 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -135,8 +135,9 @@ namespace WebStat {
{
curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0};
- for (int interesting = 0;
- curl_multi_poll(curl.get(), &logIn, 1, settings.idleJobsAfter, &interesting) == CURLM_OK;) {
+ const auto curlTimeOut = static_cast<int>(
+ std::chrono::duration_cast<std::chrono::milliseconds>(settings.checkJobsAfter).count());
+ while (curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
@@ -146,8 +147,8 @@ namespace WebStat {
break;
}
}
- else if (!interesting) {
- runJobsIdle();
+ if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
+ runJobsAsNeeded();
}
if (!curlOperations.empty()) {
handleCurlOperations();
@@ -218,10 +219,10 @@ namespace WebStat {
}
void
- Ingestor::runJobsIdle()
+ Ingestor::runJobsAsNeeded()
{
auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) {
- if (!job.currentRun && job.lastRun + freq < now) {
+ if (!job.currentRun && expired(job.lastRun, freq, now)) {
job.currentRun.emplace([this, now, freq, &job]() {
try {
(this->*job.impl)();