From 0f5a0a8e2d43774288d4d6ea747278ca6e085a2a Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 20 Mar 2026 02:17:04 +0000 Subject: Insert log entries in batches Store log lines in memory until threshold is reach or idle occurs, then insert all the lines in a single transaction. Save points handle the case of insertion errors. On success the queue is cleared. Parked lines also saved in bulk, only necessary if queued lines could not be inserted on shutdown, else the queue simply grows until ability to insert is restored. Importing parked lines just adds them to the queue and the normal process then follows. --- src/ingestor.cpp | 131 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 77 insertions(+), 54 deletions(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2018ad..1de9ab9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -92,6 +91,7 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() @@ -171,12 +171,18 @@ namespace WebStat { if (logIn.revents) { if (auto line = scn::scan(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + queuedLines.emplace_back(std::move(line->value())); + if (queuedLines.size() >= settings.maxBatchSize) { + tryIngestQueuedLogLines(); + } } else { break; } } + else { + tryIngestQueuedLogLines(); + } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } @@ -184,68 +190,84 @@ namespace WebStat { handleCurlOperations(); } } + tryIngestQueuedLogLines(); + parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::tryIngestQueuedLogLines() { try { - ingestLogLine(dbpool->get().get(), line); + ingestLogLines(dbpool->get().get(), queuedLines); + queuedLines.clear(); } catch (const std::exception &) { - parkLogLine(line); + existingEntities.clear(); } } void - Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto rememberNewEntityIds = [this](const auto & ids) { - existingEntities.insert_range(ids | std::views::take_while(&std::optional::has_value) - | std::views::transform([](auto && value) { - return *value; - })); - }; - if (auto result = scanLogLine(line)) { - linesParsed++; - const auto values = crc32ScanValues(result->values()); - NewEntityIds ids; - try { - { - std::optional dbtx; + auto nonNullEntityIds = std::views::take_while(&std::optional::has_value) + | std::views::transform([](auto && value) { + return *value; + }); + + DB::TransactionScope batchTx {*dbconn}; + for (const auto & line : lines) { + if (auto result = scanLogLine(line)) { + linesParsed++; + const auto values = crc32ScanValues(result->values()); + try { + DB::TransactionScope dbtx {*dbconn}; if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - ids = storeEntities(dbconn, newEnts); + existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); } storeLogLine(dbconn, values); } - rememberNewEntityIds(ids); - } - catch (const DB::Error & originalError) { - try { - const auto uninsertableLine = ToEntity {}(line); - rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine})); - } - catch (const std::exception &) { - throw originalError; + catch (const DB::Error & originalError) { + try { + DB::TransactionScope dbtx {*dbconn}; + const auto uninsertableLine = ToEntity {}(line); + storeEntities(dbconn, {uninsertableLine}); + } + catch (const std::exception &) { + throw originalError; + } } } - } - else { - linesDiscarded++; - const auto unparsableLine = ToEntity {}(line); - rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); + else { + linesDiscarded++; + const auto unparsableLine = ToEntity {}(line); + storeEntities(dbconn, {unparsableLine}); + } } } void - Ingestor::parkLogLine(std::string_view line) + Ingestor::parkQueuedLogLines() { - std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line; - linesParked++; + if (queuedLines.empty()) { + return; + } + std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))}; + if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { + fprintf(parked.get(), "%zu\n", queuedLines.size()); + for (const auto & line : queuedLines) { + fprintf(parked.get(), "%.*s\n", static_cast(line.length()), line.data()); + } + if (fflush(parked.get()) == 0) { + linesParked += queuedLines.size(); + queuedLines.clear(); + } + else { + std::filesystem::remove(path); + } + } } void @@ -280,7 +302,7 @@ namespace WebStat { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan(pathIter->path().filename().string(), "parked-{}.log")) { - jobIngestParkedLine(pathIter); + jobIngestParkedLines(pathIter->path()); count += 1; } } @@ -288,29 +310,30 @@ namespace WebStat { } void - Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter) + Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { - jobIngestParkedLine(pathIter->path(), pathIter->file_size()); + if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { + if (auto count = scn::scan(parked.get(), "{}\n")) { + jobIngestParkedLines(parked.get(), count->value()); + std::filesystem::remove(path); + return; + } + } + throw std::system_error {errno, std::generic_category(), strerror(errno)}; } void - Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size) + Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { - if (std::ifstream parked {path}) { - std::string line; - line.resize_and_overwrite(size, [&parked](char * content, size_t size) { - parked.read(content, static_cast(size)); - return static_cast(parked.tellg()); - }); - if (line.length() < size) { + for (size_t line = 0; line < count; ++line) { + if (auto line = scn::scan(lines, "{:[^\n]}\n")) { + linesRead++; + queuedLines.emplace_back(std::move(line->value())); + } + else { throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } - ingestLogLine(dbpool->get().get(), line); - } - else { - throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - std::filesystem::remove(path); } unsigned int -- cgit v1.3