From ae0ee6bbfb4dcd1f112876d0e7e5b3bcabdfb002 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Tue, 17 Mar 2026 17:33:14 +0000 Subject: Use std::future over std::thread for background jobs Easier checking if a job has completed [successfully] and reseting state for the next time. --- src/ingestor.cpp | 12 ++++++++---- src/ingestor.hpp | 3 ++- test/test-ingest.cpp | 8 ++++++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index ba89991..692ddd2 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -223,17 +223,21 @@ namespace WebStat { Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { - if (!job.currentRun && expired(job.lastRun, freq, now)) { - job.currentRun.emplace([this, now, freq, &job]() { + if (job.currentRun) { + if (job.currentRun->valid()) { try { - (this->*job.impl)(); + job.currentRun->get(); job.lastRun = now; } catch (const std::exception &) { // Error, retry in half the frequency job.lastRun = now - (freq / 2); } - }); + job.currentRun.reset(); + } + } + else if (expired(job.lastRun, freq, now)) { + job.currentRun.emplace(std::async(job.impl, this)); } }; runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index c5628d6..0bf2297 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -78,7 +79,7 @@ namespace WebStat { const Impl impl; LastRunTime lastRun {LastRunTime::clock::now()}; - std::optional currentRun {std::nullopt}; + std::optional> currentRun; }; Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()}; diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 558444d..41b3690 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -311,7 +311,9 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); - ingestParkedLines.currentRun->join(); + ingestParkedLines.currentRun->wait(); + runJobsAsNeeded(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); @@ -326,7 +328,9 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); - ingestParkedLines.currentRun->join(); + ingestParkedLines.currentRun->wait(); + runJobsAsNeeded(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s); BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s); -- cgit v1.3