diff options
| -rw-r--r-- | src/ingestor.cpp | 13 | ||||
| -rw-r--r-- | src/ingestor.hpp | 5 | ||||
| -rw-r--r-- | src/util.hpp | 20 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 4 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 8 |
5 files changed, 36 insertions, 14 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 988fd4d..950641b 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -135,8 +135,9 @@ namespace WebStat { { curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0}; - for (int interesting = 0; - curl_multi_poll(curl.get(), &logIn, 1, settings.idleJobsAfter, &interesting) == CURLM_OK;) { + const auto curlTimeOut = static_cast<int>( + std::chrono::duration_cast<std::chrono::milliseconds>(settings.checkJobsAfter).count()); + while (curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; @@ -146,8 +147,8 @@ namespace WebStat { break; } } - else if (!interesting) { - runJobsIdle(); + if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { + runJobsAsNeeded(); } if (!curlOperations.empty()) { handleCurlOperations(); @@ -218,10 +219,10 @@ namespace WebStat { } void - Ingestor::runJobsIdle() + Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { - if (!job.currentRun && job.lastRun + freq < now) { + if (!job.currentRun && expired(job.lastRun, freq, now)) { job.currentRun.emplace([this, now, freq, &job]() { try { (this->*job.impl)(); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 72417d9..b158f03 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -23,7 +23,7 @@ namespace WebStat { std::filesystem::path fallbackDir = "/var/log/webstat"; unsigned int dbMax = 4; unsigned int dbKeep = 2; - int idleJobsAfter = duration_cast<milliseconds>(1min).count(); + minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months @@ -52,7 +52,7 @@ namespace WebStat { void ingestLogLine(std::string_view); void ingestLogLine(DB::Connection *, std::string_view); void parkLogLine(std::string_view); - void runJobsIdle(); + void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); unsigned int jobPurgeOldLogs(); @@ -81,6 +81,7 @@ namespace WebStat { std::optional<std::thread> currentRun {std::nullopt}; }; + Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()}; Job ingestParkedLines; Job purgeOldLogs; diff --git a/src/util.hpp b/src/util.hpp index 8f2a585..28bcebd 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -75,4 +75,24 @@ namespace WebStat { return out; } + + template<typename Clock, typename Dur, typename Rep, typename Period> + bool + expired(const std::chrono::time_point<Clock, Dur> lastRun, const std::chrono::duration<Rep, Period> freq, + const typename Clock::time_point now = Clock::now()) + { + return lastRun + freq < now; + } + + template<typename Clock, typename Dur, typename Rep, typename Period> + bool + expiredThenSet(std::chrono::time_point<Clock, Dur> & lastRun, const std::chrono::duration<Rep, Period> freq, + const typename Clock::time_point now = Clock::now()) + { + if (expired(lastRun, freq, now)) { + lastRun = now; + return true; + } + return false; + } } diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 7f4d9b4..6d3aeda 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -55,8 +55,8 @@ main(int argc, char ** argv) "Number of write/read write DB connections to keep open") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") - ("jobs.idle", po::value(&settings.idleJobsAfter)->default_value(settings.idleJobsAfter), - "Run idle when there's no activity for this period (ms)") + ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), + "How often to check for jobs needing execution (mins)") ("job.parked.freq", po::value(&settings.freqIngestParkedLines)->default_value(settings.freqIngestParkedLines), "How often to check for and import parked log lines") ("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs), diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 3effae4..558444d 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -284,7 +284,7 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs) { - runJobsIdle(); + runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_REQUIRE(!purgeOldLogs.currentRun); } @@ -296,7 +296,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.lastRun = now - 1s; parkLogLine(LOGLINE1); - runJobsIdle(); + runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_REQUIRE_EQUAL(linesParked, 1); BOOST_REQUIRE_EQUAL(linesParsed, 0); @@ -309,7 +309,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; - runJobsIdle(); + runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); ingestParkedLines.currentRun->join(); BOOST_CHECK_EQUAL(linesParsed, 1); @@ -324,7 +324,7 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; parkLogLine(LOGLINE1); std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); - runJobsIdle(); + runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); ingestParkedLines.currentRun->join(); BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); |
