diff options
| -rw-r--r-- | src/ingestor.cpp | 11 | ||||
| -rw-r--r-- | src/ingestor.hpp | 4 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 4 |
3 files changed, 14 insertions, 5 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index c41454f..04216f8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -253,14 +253,19 @@ namespace WebStat { void Ingestor::tryIngestQueuedLogLines() { + auto storedEnd = queuedLines.begin(); try { - ingestLogLines(dbpool->get().get(), queuedLines); - queuedLines.clear(); + for (auto batch : + queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) { + ingestLogLines(dbpool->get().get(), batch); + storedEnd = batch.end(); + } } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); existingEntities.clear(); } + queuedLines.erase(queuedLines.begin(), storedEnd); } template<typename... T> @@ -285,7 +290,7 @@ namespace WebStat { } void - Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { auto entityIds = std::views::transform([](auto && value) { return std::make_pair(value->hash, *value->id); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 195f325..916f200 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -26,6 +26,7 @@ namespace WebStat { unsigned int dbMax = 4; unsigned int dbKeep = 2; size_t maxBatchSize = 1; + size_t maxBatches = 5; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -39,6 +40,7 @@ namespace WebStat { class Ingestor { public: using LineBatch = std::vector<std::string>; + using LinesView = std::span<const std::string>; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -54,7 +56,7 @@ namespace WebStat { void ingestLog(std::FILE *); void tryIngestQueuedLogLines(); - void ingestLogLines(DB::Connection *, const LineBatch & lines); + void ingestLogLines(DB::Connection *, LinesView lines); std::expected<std::filesystem::path, int> parkQueuedLogLines(); void runJobsAsNeeded(); diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 3eaa9f0..04d59fc 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -68,7 +68,9 @@ main(int argc, char ** argv) ("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep), "Number of write/read write DB connections to keep open") ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize), - "Maximum number of access log entries to hold in memory before writing them to the DB") + "Number of access log entries to hold in memory before starting to write them to the DB") + ("db.maxBatches", po::value(&settings.maxBatches)->default_value(settings.maxBatches), + "Maximum number of batches to insert before returning to reading input") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), |
