From 0f5a0a8e2d43774288d4d6ea747278ca6e085a2a Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 20 Mar 2026 02:17:04 +0000 Subject: Insert log entries in batches Store log lines in memory until threshold is reach or idle occurs, then insert all the lines in a single transaction. Save points handle the case of insertion errors. On success the queue is cleared. Parked lines also saved in bulk, only necessary if queued lines could not be inserted on shutdown, else the queue simply grows until ability to insert is restored. Importing parked lines just adds them to the queue and the normal process then follows. --- test/perf-ingest.cpp | 3 ++- test/test-ingest.cpp | 51 +++++++++++++++++++++++++++------------------------ 2 files changed, 29 insertions(+), 25 deletions(-) (limited to 'test') diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp index 69212de..c403349 100644 --- a/test/perf-ingest.cpp +++ b/test/perf-ingest.cpp @@ -25,6 +25,7 @@ namespace { std::make_shared("webstat"), { .userAgentAPI = {}, + .maxBatchSize = static_cast(state.range(0)), }}; for (auto loop : state) { WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")}; @@ -33,6 +34,6 @@ namespace { } } -BENCHMARK(doIngestFile)->Setup(setup); +BENCHMARK_RANGE(doIngestFile, 1, 1024)->Setup(setup); BENCHMARK_MAIN(); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index be3be56..a1dc5e9 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -230,11 +230,10 @@ BOOST_DATA_TEST_CASE(StoreLogLine, }), line) { - ingestLogLine(DB::MockDatabase::openConnectionTo("webstat").get(), line); + ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}}); BOOST_CHECK_EQUAL(linesRead, 0); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); - BOOST_CHECK_EQUAL(linesParked, 0); BOOST_CHECK_EQUAL(existingEntities.size(), 5); } @@ -264,34 +263,33 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) BOOST_AUTO_TEST_CASE(ParkLogLine) { - parkLogLine(LOGLINE1); - BOOST_CHECK_EQUAL(linesParked, 1); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); const auto path = settings.fallbackDir / LOGLINE1_PARKED; BOOST_TEST_INFO(path); BOOST_REQUIRE(std::filesystem::exists(path)); - BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length()); + BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4); } -BOOST_TEST_DECORATOR(*boost::unit_test::depends_on("I/ParkLogLine")) - -BOOST_AUTO_TEST_CASE(ParkLogLineOnError) +BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) { BOOST_REQUIRE(existingEntities.empty()); constexpr std::string_view LOGLINE_BAD_VERB = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; - BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE_BAD_VERB)); + BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); BOOST_CHECK_EQUAL(linesParked, 0); - BOOST_CHECK_EQUAL(existingEntities.size(), 1); + BOOST_CHECK_EQUAL(linesDiscarded, 1); } BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine")) { - parkLogLine(LOGLINE1); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); jobIngestParkedLines(); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 2); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -307,18 +305,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); + BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; + runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; @@ -327,8 +329,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.currentRun->wait(); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 1); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -337,7 +338,8 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); @@ -366,7 +368,8 @@ BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail) BOOST_AUTO_TEST_CASE(DiscardUnparsable) { - BOOST_REQUIRE_NO_THROW(ingestLogLine("does not parse")); + queuedLines.emplace_back("does not parse"); + BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); constexpr std::array, 1> EXPECTED {{ -- cgit v1.3