diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-20 02:17:04 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-20 02:23:08 +0000 |
| commit | 0f5a0a8e2d43774288d4d6ea747278ca6e085a2a (patch) | |
| tree | 08878dff32a636b388c660fd3330f1bddbd98af3 | |
| parent | 8c6fecd356003309f8eebec30374344272ca6072 (diff) | |
| download | webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.tar.bz2 webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.tar.xz webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.zip | |
Insert log entries in batches
Store log lines in memory until threshold is reach or idle occurs, then
insert all the lines in a single transaction. Save points handle the
case of insertion errors. On success the queue is cleared.
Parked lines also saved in bulk, only necessary if queued lines could
not be inserted on shutdown, else the queue simply grows until ability
to insert is restored. Importing parked lines just adds them to the
queue and the normal process then follows.
| -rw-r--r-- | src/ingestor.cpp | 131 | ||||
| -rw-r--r-- | src/ingestor.hpp | 15 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 | ||||
| -rw-r--r-- | test/perf-ingest.cpp | 3 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 51 |
5 files changed, 117 insertions, 85 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2018ad..1de9ab9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,6 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> -#include <fstream> #include <modifycommand.h> #include <ranges> #include <scn/scan.h> @@ -92,6 +91,7 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() @@ -171,12 +171,18 @@ namespace WebStat { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + queuedLines.emplace_back(std::move(line->value())); + if (queuedLines.size() >= settings.maxBatchSize) { + tryIngestQueuedLogLines(); + } } else { break; } } + else { + tryIngestQueuedLogLines(); + } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } @@ -184,68 +190,84 @@ namespace WebStat { handleCurlOperations(); } } + tryIngestQueuedLogLines(); + parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::tryIngestQueuedLogLines() { try { - ingestLogLine(dbpool->get().get(), line); + ingestLogLines(dbpool->get().get(), queuedLines); + queuedLines.clear(); } catch (const std::exception &) { - parkLogLine(line); + existingEntities.clear(); } } void - Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto rememberNewEntityIds = [this](const auto & ids) { - existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - })); - }; - if (auto result = scanLogLine(line)) { - linesParsed++; - const auto values = crc32ScanValues(result->values()); - NewEntityIds ids; - try { - { - std::optional<DB::TransactionScope> dbtx; + auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) + | std::views::transform([](auto && value) { + return *value; + }); + + DB::TransactionScope batchTx {*dbconn}; + for (const auto & line : lines) { + if (auto result = scanLogLine(line)) { + linesParsed++; + const auto values = crc32ScanValues(result->values()); + try { + DB::TransactionScope dbtx {*dbconn}; if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - ids = storeEntities(dbconn, newEnts); + existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); } storeLogLine(dbconn, values); } - rememberNewEntityIds(ids); - } - catch (const DB::Error & originalError) { - try { - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine})); - } - catch (const std::exception &) { - throw originalError; + catch (const DB::Error & originalError) { + try { + DB::TransactionScope dbtx {*dbconn}; + const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeEntities(dbconn, {uninsertableLine}); + } + catch (const std::exception &) { + throw originalError; + } } } - } - else { - linesDiscarded++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); + else { + linesDiscarded++; + const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeEntities(dbconn, {unparsableLine}); + } } } void - Ingestor::parkLogLine(std::string_view line) + Ingestor::parkQueuedLogLines() { - std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line; - linesParked++; + if (queuedLines.empty()) { + return; + } + std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))}; + if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { + fprintf(parked.get(), "%zu\n", queuedLines.size()); + for (const auto & line : queuedLines) { + fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + } + if (fflush(parked.get()) == 0) { + linesParked += queuedLines.size(); + queuedLines.clear(); + } + else { + std::filesystem::remove(path); + } + } } void @@ -280,7 +302,7 @@ namespace WebStat { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { - jobIngestParkedLine(pathIter); + jobIngestParkedLines(pathIter->path()); count += 1; } } @@ -288,29 +310,30 @@ namespace WebStat { } void - Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter) + Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { - jobIngestParkedLine(pathIter->path(), pathIter->file_size()); + if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { + if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) { + jobIngestParkedLines(parked.get(), count->value()); + std::filesystem::remove(path); + return; + } + } + throw std::system_error {errno, std::generic_category(), strerror(errno)}; } void - Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size) + Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { - if (std::ifstream parked {path}) { - std::string line; - line.resize_and_overwrite(size, [&parked](char * content, size_t size) { - parked.read(content, static_cast<std::streamsize>(size)); - return static_cast<size_t>(parked.tellg()); - }); - if (line.length() < size) { + for (size_t line = 0; line < count; ++line) { + if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { + linesRead++; + queuedLines.emplace_back(std::move(line->value())); + } + else { throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } - ingestLogLine(dbpool->get().get(), line); - } - else { - throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - std::filesystem::remove(path); } unsigned int diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 2ae2936..94e0d5c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -24,6 +24,7 @@ namespace WebStat { std::filesystem::path fallbackDir = "/var/log/webstat"; unsigned int dbMax = 4; unsigned int dbKeep = 2; + size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -36,6 +37,7 @@ namespace WebStat { class Ingestor { public: + using LineBatch = std::vector<std::string>; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -50,9 +52,9 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void ingestLogLine(std::string_view); - void ingestLogLine(DB::Connection *, std::string_view); - void parkLogLine(std::string_view); + void tryIngestQueuedLogLines(); + void ingestLogLines(DB::Connection *, const LineBatch & lines); + void parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); @@ -70,7 +72,8 @@ namespace WebStat { size_t linesParsed = 0; size_t linesDiscarded = 0; size_t linesParked = 0; - mutable std::flat_set<Crc32Value> existingEntities; + std::flat_set<Crc32Value> existingEntities; + LineBatch queuedLines; bool terminated = false; @@ -98,8 +101,8 @@ namespace WebStat { void onNewUserAgent(const Entity &) const; void handleCurlOperations(); - void jobIngestParkedLine(const std::filesystem::directory_iterator &); - void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size); + void jobIngestParkedLines(const std::filesystem::path &); + void jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 6d3aeda..39d794a 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -53,6 +53,8 @@ main(int argc, char ** argv) "Maximum number of concurrent write/read write DB connections") ("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep), "Number of write/read write DB connections to keep open") + ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize), + "Maximum number of access log entries to hold in memory before writing them to the DB") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp index 69212de..c403349 100644 --- a/test/perf-ingest.cpp +++ b/test/perf-ingest.cpp @@ -25,6 +25,7 @@ namespace { std::make_shared<WebStat::MockDBPool>("webstat"), { .userAgentAPI = {}, + .maxBatchSize = static_cast<size_t>(state.range(0)), }}; for (auto loop : state) { WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")}; @@ -33,6 +34,6 @@ namespace { } } -BENCHMARK(doIngestFile)->Setup(setup); +BENCHMARK_RANGE(doIngestFile, 1, 1024)->Setup(setup); BENCHMARK_MAIN(); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index be3be56..a1dc5e9 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -230,11 +230,10 @@ BOOST_DATA_TEST_CASE(StoreLogLine, }), line) { - ingestLogLine(DB::MockDatabase::openConnectionTo("webstat").get(), line); + ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}}); BOOST_CHECK_EQUAL(linesRead, 0); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); - BOOST_CHECK_EQUAL(linesParked, 0); BOOST_CHECK_EQUAL(existingEntities.size(), 5); } @@ -264,34 +263,33 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) BOOST_AUTO_TEST_CASE(ParkLogLine) { - parkLogLine(LOGLINE1); - BOOST_CHECK_EQUAL(linesParked, 1); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); const auto path = settings.fallbackDir / LOGLINE1_PARKED; BOOST_TEST_INFO(path); BOOST_REQUIRE(std::filesystem::exists(path)); - BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length()); + BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4); } -BOOST_TEST_DECORATOR(*boost::unit_test::depends_on("I/ParkLogLine")) - -BOOST_AUTO_TEST_CASE(ParkLogLineOnError) +BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) { BOOST_REQUIRE(existingEntities.empty()); constexpr std::string_view LOGLINE_BAD_VERB = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; - BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE_BAD_VERB)); + BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); BOOST_CHECK_EQUAL(linesParked, 0); - BOOST_CHECK_EQUAL(existingEntities.size(), 1); + BOOST_CHECK_EQUAL(linesDiscarded, 1); } BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine")) { - parkLogLine(LOGLINE1); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); jobIngestParkedLines(); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 2); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -307,18 +305,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); + BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; + runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; @@ -327,8 +329,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.currentRun->wait(); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 1); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -337,7 +338,8 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); @@ -366,7 +368,8 @@ BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail) BOOST_AUTO_TEST_CASE(DiscardUnparsable) { - BOOST_REQUIRE_NO_THROW(ingestLogLine("does not parse")); + queuedLines.emplace_back("does not parse"); + BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{ |
