summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-05-02 18:58:21 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-05-02 18:58:21 +0100
commit4675ab65ea5e807e0d457845a0ca84edcf1262c9 (patch)
treec8e0adae9cfefe6643f03b797283b3012e07b2d3 /src/ingestor.cpp
parent5c0206f48dc7f90009629d0a74bdc1dd6b4f67ea (diff)
downloadwebstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.bz2
webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.xz
webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.zip
Ingest log lines in a background thread
This prevents halting reading input during data insertion.
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp60
1 files changed, 41 insertions, 19 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index e2c315c..4af2f2d 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -117,6 +117,7 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
@@ -225,7 +226,7 @@ namespace WebStat {
stats.linesRead++;
queuedLines.emplace_back(std::move(line->value()));
if (queuedLines.size() >= settings.maxBatchSize) {
- tryIngestQueuedLogLines();
+ beginIngestQueuedLogLines();
}
}
else {
@@ -233,7 +234,7 @@ namespace WebStat {
}
}
else {
- tryIngestQueuedLogLines();
+ beginIngestQueuedLogLines();
}
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
@@ -243,21 +244,37 @@ namespace WebStat {
}
}
finishAllJobs();
- tryIngestQueuedLogLines();
- std::ignore = parkQueuedLogLines();
+ std::invoke(storeQueueLines.impl, this)();
+ std::ignore = parkLogLines(queuedLines);
+ std::ignore = parkLogLines(processingLines);
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
logStats();
}
- void
- Ingestor::tryIngestQueuedLogLines()
+ std::pair<std::future<Ingestor::Job::Result> &, bool>
+ Ingestor::beginIngestQueuedLogLines()
+ {
+ if (storeQueueLines.currentRun) {
+ if (!storeQueueLines.currentRun->valid()) {
+ return {*storeQueueLines.currentRun, false};
+ }
+ finalizeJob(storeQueueLines, {}, Job::LastRunTime::clock::now());
+ }
+ if (processingLines.empty()) {
+ std::swap(queuedLines, processingLines);
+ }
+ return {storeQueueLines.currentRun.emplace(std::async(storeQueueLines.impl, this)), true};
+ }
+
+ Ingestor::Job::Result
+ Ingestor::jobStoreQueuedLines()
{
- auto storedEnd = queuedLines.begin();
+ auto storedEnd = processingLines.begin();
try {
- for (auto batch :
- queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) {
+ for (auto batch : processingLines | std::views::chunk(settings.maxBatchSize)
+ | std::views::take(settings.maxBatches)) {
ingestLogLines(dbpool->get().get(), batch);
storedEnd = batch.end();
}
@@ -266,7 +283,11 @@ namespace WebStat {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
existingEntities.clear();
}
- queuedLines.erase(queuedLines.begin(), storedEnd);
+ auto count = std::distance(processingLines.begin(), storedEnd);
+ processingLines.erase(processingLines.begin(), storedEnd);
+ return [count]() {
+ return count;
+ };
}
template<typename... T>
@@ -335,20 +356,20 @@ namespace WebStat {
}
std::expected<std::filesystem::path, int>
- Ingestor::parkQueuedLogLines()
+ Ingestor::parkLogLines(LineBatch & lines)
{
- if (queuedLines.empty()) {
+ if (lines.empty()) {
return std::unexpected(0);
}
- const std::filesystem::path path {settings.fallbackDir
- / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))};
+ const std::filesystem::path path {
+ settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))};
if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
- fprintf(parked.get(), "%zu\n", queuedLines.size());
- for (const auto & line : queuedLines) {
+ fprintf(parked.get(), "%zu\n", lines.size());
+ for (const auto & line : lines) {
fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
}
if (fflush(parked.get()) == 0) {
- queuedLines.clear();
+ lines.clear();
auto finalPath = std::filesystem::path {path}.replace_extension(".log");
parked.reset();
if (rename(path.c_str(), finalPath.c_str()) == 0) {
@@ -357,8 +378,8 @@ namespace WebStat {
}
}
const int err = errno;
- log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size());
- for (const auto & line : queuedLines) {
+ log(LOG_ERR, "Failed to park %zu queued lines:", lines.size());
+ for (const auto & line : lines) {
log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data());
}
return std::unexpected(err);
@@ -407,6 +428,7 @@ namespace WebStat {
};
finishJob(ingestParkedLines);
finishJob(purgeOldLogs);
+ finishJob(storeQueueLines);
}
Ingestor::Job::Result