From 1c929e46d8e7b14a74f80a21b5f30c9abbee410f Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 1 May 2026 10:32:48 +0100 Subject: Limit the number lines stored at once Limits the number lines inserted per transactions, and the number of transactions before returning to reading input. Prevents long running transactions in the case when queued lines has grown in size. --- src/ingestor.cpp | 11 ++++++++--- src/ingestor.hpp | 4 +++- src/webstat_logger_main.cpp | 4 +++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index c41454f..04216f8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -253,14 +253,19 @@ namespace WebStat { void Ingestor::tryIngestQueuedLogLines() { + auto storedEnd = queuedLines.begin(); try { - ingestLogLines(dbpool->get().get(), queuedLines); - queuedLines.clear(); + for (auto batch : + queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) { + ingestLogLines(dbpool->get().get(), batch); + storedEnd = batch.end(); + } } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); existingEntities.clear(); } + queuedLines.erase(queuedLines.begin(), storedEnd); } template @@ -285,7 +290,7 @@ namespace WebStat { } void - Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { auto entityIds = std::views::transform([](auto && value) { return std::make_pair(value->hash, *value->id); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 195f325..916f200 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -26,6 +26,7 @@ namespace WebStat { unsigned int dbMax = 4; unsigned int dbKeep = 2; size_t maxBatchSize = 1; + size_t maxBatches = 5; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -39,6 +40,7 @@ namespace WebStat { class Ingestor { public: using LineBatch = std::vector; + using LinesView = std::span; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -54,7 +56,7 @@ namespace WebStat { void ingestLog(std::FILE *); void tryIngestQueuedLogLines(); - void ingestLogLines(DB::Connection *, const LineBatch & lines); + void ingestLogLines(DB::Connection *, LinesView lines); std::expected parkQueuedLogLines(); void runJobsAsNeeded(); diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 3eaa9f0..04d59fc 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -68,7 +68,9 @@ main(int argc, char ** argv) ("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep), "Number of write/read write DB connections to keep open") ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize), - "Maximum number of access log entries to hold in memory before writing them to the DB") + "Number of access log entries to hold in memory before starting to write them to the DB") + ("db.maxBatches", po::value(&settings.maxBatches)->default_value(settings.maxBatches), + "Maximum number of batches to insert before returning to reading input") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), -- cgit v1.3