diff options
| -rw-r--r-- | src/ingestor.cpp | 12 | ||||
| -rw-r--r-- | src/ingestor.hpp | 3 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 8 |
3 files changed, 16 insertions, 7 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index ba89991..692ddd2 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -223,17 +223,21 @@ namespace WebStat { Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { - if (!job.currentRun && expired(job.lastRun, freq, now)) { - job.currentRun.emplace([this, now, freq, &job]() { + if (job.currentRun) { + if (job.currentRun->valid()) { try { - (this->*job.impl)(); + job.currentRun->get(); job.lastRun = now; } catch (const std::exception &) { // Error, retry in half the frequency job.lastRun = now - (freq / 2); } - }); + job.currentRun.reset(); + } + } + else if (expired(job.lastRun, freq, now)) { + job.currentRun.emplace(std::async(job.impl, this)); } }; runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index c5628d6..0bf2297 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -8,6 +8,7 @@ #include <connection_fwd.h> #include <cstdio> #include <flat_set> +#include <future> #include <scn/scan.h> #include <span> #include <sys/utsname.h> @@ -78,7 +79,7 @@ namespace WebStat { const Impl impl; LastRunTime lastRun {LastRunTime::clock::now()}; - std::optional<std::thread> currentRun {std::nullopt}; + std::optional<std::future<unsigned int>> currentRun; }; Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()}; diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 558444d..41b3690 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -311,7 +311,9 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); - ingestParkedLines.currentRun->join(); + ingestParkedLines.currentRun->wait(); + runJobsAsNeeded(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); @@ -326,7 +328,9 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); - ingestParkedLines.currentRun->join(); + ingestParkedLines.currentRun->wait(); + runJobsAsNeeded(); + BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s); BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s); |
