summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp39
-rw-r--r--src/ingestor.hpp18
-rw-r--r--test/test-ingest.cpp38
3 files changed, 61 insertions, 34 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index da39c59..988fd4d 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -75,8 +75,8 @@ namespace WebStat {
}
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings settings) :
- settings {std::move(settings)}, dbpool {std::move(dbpl)}, hostnameId {crc32(host.nodename)},
- curl {curl_multi_init()}
+ settings {std::move(settings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines},
+ purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}
{
auto dbconn = dbpool->get();
auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
@@ -220,24 +220,25 @@ namespace WebStat {
void
Ingestor::runJobsIdle()
{
- const auto now = JobLastRunTime::clock::now();
- auto runJobAsNeeded = [this, now](auto job, JobLastRunTime & lastRun, auto freq) {
- try {
- if (lastRun + freq < now) {
- (this->*job)();
- lastRun = now;
- }
- }
- catch (const std::exception &) {
- // Error, retry in half the frequency
- lastRun = now - (freq / 2);
+ auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) {
+ if (!job.currentRun && job.lastRun + freq < now) {
+ job.currentRun.emplace([this, now, freq, &job]() {
+ try {
+ (this->*job.impl)();
+ job.lastRun = now;
+ }
+ catch (const std::exception &) {
+ // Error, retry in half the frequency
+ job.lastRun = now - (freq / 2);
+ }
+ });
}
};
- runJobAsNeeded(&Ingestor::jobIngestParkedLines, lastRunIngestParkedLines, settings.freqIngestParkedLines);
- runJobAsNeeded(&Ingestor::jobPurgeOldLogs, lastRunPurgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
+ runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
}
- void
+ unsigned int
Ingestor::jobIngestParkedLines()
{
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
@@ -246,6 +247,8 @@ namespace WebStat {
jobIngestParkedLine(pathIter);
}
}
+ // TODO return actual count
+ return 0;
}
void
@@ -278,12 +281,12 @@ namespace WebStat {
Ingestor::jobPurgeOldLogs()
{
auto dbconn = dbpool->get();
- const auto stopAt = JobLastRunTime::clock::now() + settings.purgeDeleteMaxTime;
+ const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime;
const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS);
purge->bindParam(0, settings.purgeDeleteMax);
purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep));
unsigned int purgedTotal {};
- while (stopAt > JobLastRunTime::clock::now()) {
+ while (stopAt > Job::LastRunTime::clock::now()) {
const auto purged = purge->execute();
purgedTotal += purged;
if (purged < settings.purgeDeleteMax) {
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 67a7a15..72417d9 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -54,7 +54,7 @@ namespace WebStat {
void parkLogLine(std::string_view);
void runJobsIdle();
- void jobIngestParkedLines();
+ unsigned int jobIngestParkedLines();
unsigned int jobPurgeOldLogs();
template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
@@ -70,9 +70,19 @@ namespace WebStat {
size_t linesParked = 0;
mutable std::flat_set<Crc32Value> existingEntities;
- using JobLastRunTime = std::chrono::system_clock::time_point;
- JobLastRunTime lastRunIngestParkedLines;
- JobLastRunTime lastRunPurgeOldLogs;
+ struct Job {
+ using LastRunTime = std::chrono::system_clock::time_point;
+ using Impl = unsigned int (Ingestor::*)();
+
+ explicit Job(Impl impl) : impl(impl) { }
+
+ const Impl impl;
+ LastRunTime lastRun {LastRunTime::clock::now()};
+ std::optional<std::thread> currentRun {std::nullopt};
+ };
+
+ Job ingestParkedLines;
+ Job purgeOldLogs;
private:
static constexpr size_t MAX_NEW_ENTITIES = 6;
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index d523aab..3effae4 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -282,40 +282,54 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine"
BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
}
-BOOST_AUTO_TEST_CASE(IngestParkedJob, *boost::unit_test::depends_on("I/IngestParked"))
+BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs)
{
- const auto now = JobLastRunTime::clock::now();
- lastRunIngestParkedLines = now - 1s;
+ runJobsIdle();
+ BOOST_REQUIRE(!ingestParkedLines.currentRun);
+ BOOST_REQUIRE(!purgeOldLogs.currentRun);
+}
+
+BOOST_AUTO_TEST_CASE(IngestParkedJob,
+ *boost::unit_test::depends_on("I/IngestParked") * boost::unit_test::depends_on("I/DefaultLaunchNoJobs"))
+{
+ const auto now = Job::LastRunTime::clock::now();
+ ingestParkedLines.lastRun = now - 1s;
parkLogLine(LOGLINE1);
runJobsIdle();
+ BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_REQUIRE_EQUAL(linesParked, 1);
BOOST_REQUIRE_EQUAL(linesParsed, 0);
- BOOST_CHECK_EQUAL(lastRunIngestParkedLines, now - 1s);
+ BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s);
- lastRunIngestParkedLines = now - settings.freqIngestParkedLines + 2s;
+ ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s;
+ BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_REQUIRE_EQUAL(linesParked, 1);
BOOST_REQUIRE_EQUAL(linesParsed, 0);
- BOOST_CHECK_EQUAL(lastRunIngestParkedLines, now - settings.freqIngestParkedLines + 2s);
+ BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s);
- lastRunIngestParkedLines = now - settings.freqIngestParkedLines - 1s;
+ ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
runJobsIdle();
+ BOOST_REQUIRE(ingestParkedLines.currentRun);
+ ingestParkedLines.currentRun->join();
BOOST_CHECK_EQUAL(linesParsed, 1);
BOOST_CHECK_EQUAL(linesDiscarded, 0);
- BOOST_CHECK_GE(lastRunIngestParkedLines, now);
+ BOOST_CHECK_GE(ingestParkedLines.lastRun, now);
BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
}
BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/IngestParkedJob"))
{
- const auto now = JobLastRunTime::clock::now();
- lastRunIngestParkedLines = now - settings.freqIngestParkedLines - 1s;
+ const auto now = Job::LastRunTime::clock::now();
+ ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
parkLogLine(LOGLINE1);
std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write);
runJobsIdle();
+ BOOST_REQUIRE(ingestParkedLines.currentRun);
+ ingestParkedLines.currentRun->join();
BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
- BOOST_CHECK_GE(lastRunIngestParkedLines, now - (settings.freqIngestParkedLines / 2) - 1s);
- BOOST_CHECK_LE(lastRunIngestParkedLines, now - (settings.freqIngestParkedLines / 2) + 1s);
+ BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s);
+ BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s);
}
BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail)