summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp13
-rw-r--r--src/ingestor.hpp5
-rw-r--r--src/util.hpp20
-rw-r--r--src/webstat_logger_main.cpp4
-rw-r--r--test/test-ingest.cpp8
5 files changed, 36 insertions, 14 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 988fd4d..950641b 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -135,8 +135,9 @@ namespace WebStat {
{
curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0};
- for (int interesting = 0;
- curl_multi_poll(curl.get(), &logIn, 1, settings.idleJobsAfter, &interesting) == CURLM_OK;) {
+ const auto curlTimeOut = static_cast<int>(
+ std::chrono::duration_cast<std::chrono::milliseconds>(settings.checkJobsAfter).count());
+ while (curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
@@ -146,8 +147,8 @@ namespace WebStat {
break;
}
}
- else if (!interesting) {
- runJobsIdle();
+ if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
+ runJobsAsNeeded();
}
if (!curlOperations.empty()) {
handleCurlOperations();
@@ -218,10 +219,10 @@ namespace WebStat {
}
void
- Ingestor::runJobsIdle()
+ Ingestor::runJobsAsNeeded()
{
auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) {
- if (!job.currentRun && job.lastRun + freq < now) {
+ if (!job.currentRun && expired(job.lastRun, freq, now)) {
job.currentRun.emplace([this, now, freq, &job]() {
try {
(this->*job.impl)();
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 72417d9..b158f03 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -23,7 +23,7 @@ namespace WebStat {
std::filesystem::path fallbackDir = "/var/log/webstat";
unsigned int dbMax = 4;
unsigned int dbKeep = 2;
- int idleJobsAfter = duration_cast<milliseconds>(1min).count();
+ minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
minutes freqPurgeOldLogs = 6h;
unsigned int purgeDaysToKeep = 61; // ~2 months
@@ -52,7 +52,7 @@ namespace WebStat {
void ingestLogLine(std::string_view);
void ingestLogLine(DB::Connection *, std::string_view);
void parkLogLine(std::string_view);
- void runJobsIdle();
+ void runJobsAsNeeded();
unsigned int jobIngestParkedLines();
unsigned int jobPurgeOldLogs();
@@ -81,6 +81,7 @@ namespace WebStat {
std::optional<std::thread> currentRun {std::nullopt};
};
+ Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()};
Job ingestParkedLines;
Job purgeOldLogs;
diff --git a/src/util.hpp b/src/util.hpp
index 8f2a585..28bcebd 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -75,4 +75,24 @@ namespace WebStat {
return out;
}
+
+ template<typename Clock, typename Dur, typename Rep, typename Period>
+ bool
+ expired(const std::chrono::time_point<Clock, Dur> lastRun, const std::chrono::duration<Rep, Period> freq,
+ const typename Clock::time_point now = Clock::now())
+ {
+ return lastRun + freq < now;
+ }
+
+ template<typename Clock, typename Dur, typename Rep, typename Period>
+ bool
+ expiredThenSet(std::chrono::time_point<Clock, Dur> & lastRun, const std::chrono::duration<Rep, Period> freq,
+ const typename Clock::time_point now = Clock::now())
+ {
+ if (expired(lastRun, freq, now)) {
+ lastRun = now;
+ return true;
+ }
+ return false;
+ }
}
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 7f4d9b4..6d3aeda 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -55,8 +55,8 @@ main(int argc, char ** argv)
"Number of write/read write DB connections to keep open")
("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir),
"Path to write access logs to when the database is unavailable")
- ("jobs.idle", po::value(&settings.idleJobsAfter)->default_value(settings.idleJobsAfter),
- "Run idle when there's no activity for this period (ms)")
+ ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter),
+ "How often to check for jobs needing execution (mins)")
("job.parked.freq", po::value(&settings.freqIngestParkedLines)->default_value(settings.freqIngestParkedLines),
"How often to check for and import parked log lines")
("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs),
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index 3effae4..558444d 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -284,7 +284,7 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine"
BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs)
{
- runJobsIdle();
+ runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_REQUIRE(!purgeOldLogs.currentRun);
}
@@ -296,7 +296,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
ingestParkedLines.lastRun = now - 1s;
parkLogLine(LOGLINE1);
- runJobsIdle();
+ runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_REQUIRE_EQUAL(linesParked, 1);
BOOST_REQUIRE_EQUAL(linesParsed, 0);
@@ -309,7 +309,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s);
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
- runJobsIdle();
+ runJobsAsNeeded();
BOOST_REQUIRE(ingestParkedLines.currentRun);
ingestParkedLines.currentRun->join();
BOOST_CHECK_EQUAL(linesParsed, 1);
@@ -324,7 +324,7 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
parkLogLine(LOGLINE1);
std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write);
- runJobsIdle();
+ runJobsAsNeeded();
BOOST_REQUIRE(ingestParkedLines.currentRun);
ingestParkedLines.currentRun->join();
BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));