diff options
| -rw-r--r-- | src/ingestor.cpp | 131 | ||||
| -rw-r--r-- | src/ingestor.hpp | 15 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 | ||||
| -rw-r--r-- | test/perf-ingest.cpp | 3 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 51 |
5 files changed, 117 insertions, 85 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2018ad..1de9ab9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,6 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> -#include <fstream> #include <modifycommand.h> #include <ranges> #include <scn/scan.h> @@ -92,6 +91,7 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() @@ -171,12 +171,18 @@ namespace WebStat { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + queuedLines.emplace_back(std::move(line->value())); + if (queuedLines.size() >= settings.maxBatchSize) { + tryIngestQueuedLogLines(); + } } else { break; } } + else { + tryIngestQueuedLogLines(); + } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } @@ -184,68 +190,84 @@ namespace WebStat { handleCurlOperations(); } } + tryIngestQueuedLogLines(); + parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::tryIngestQueuedLogLines() { try { - ingestLogLine(dbpool->get().get(), line); + ingestLogLines(dbpool->get().get(), queuedLines); + queuedLines.clear(); } catch (const std::exception &) { - parkLogLine(line); + existingEntities.clear(); } } void - Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto rememberNewEntityIds = [this](const auto & ids) { - existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - })); - }; - if (auto result = scanLogLine(line)) { - linesParsed++; - const auto values = crc32ScanValues(result->values()); - NewEntityIds ids; - try { - { - std::optional<DB::TransactionScope> dbtx; + auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) + | std::views::transform([](auto && value) { + return *value; + }); + + DB::TransactionScope batchTx {*dbconn}; + for (const auto & line : lines) { + if (auto result = scanLogLine(line)) { + linesParsed++; + const auto values = crc32ScanValues(result->values()); + try { + DB::TransactionScope dbtx {*dbconn}; if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - ids = storeEntities(dbconn, newEnts); + existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); } storeLogLine(dbconn, values); } - rememberNewEntityIds(ids); - } - catch (const DB::Error & originalError) { - try { - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine})); - } - catch (const std::exception &) { - throw originalError; + catch (const DB::Error & originalError) { + try { + DB::TransactionScope dbtx {*dbconn}; + const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeEntities(dbconn, {uninsertableLine}); + } + catch (const std::exception &) { + throw originalError; + } } } - } - else { - linesDiscarded++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); + else { + linesDiscarded++; + const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeEntities(dbconn, {unparsableLine}); + } } } void - Ingestor::parkLogLine(std::string_view line) + Ingestor::parkQueuedLogLines() { - std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line; - linesParked++; + if (queuedLines.empty()) { + return; + } + std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))}; + if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { + fprintf(parked.get(), "%zu\n", queuedLines.size()); + for (const auto & line : queuedLines) { + fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + } + if (fflush(parked.get()) == 0) { + linesParked += queuedLines.size(); + queuedLines.clear(); + } + else { + std::filesystem::remove(path); + } + } } void @@ -280,7 +302,7 @@ namespace WebStat { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { - jobIngestParkedLine(pathIter); + jobIngestParkedLines(pathIter->path()); count += 1; } } @@ -288,29 +310,30 @@ namespace WebStat { } void - Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter) + Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { - jobIngestParkedLine(pathIter->path(), pathIter->file_size()); + if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { + if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) { + jobIngestParkedLines(parked.get(), count->value()); + std::filesystem::remove(path); + return; + } + } + throw std::system_error {errno, std::generic_category(), strerror(errno)}; } void - Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size) + Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { - if (std::ifstream parked {path}) { - std::string line; - line.resize_and_overwrite(size, [&parked](char * content, size_t size) { - parked.read(content, static_cast<std::streamsize>(size)); - return static_cast<size_t>(parked.tellg()); - }); - if (line.length() < size) { + for (size_t line = 0; line < count; ++line) { + if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { + linesRead++; + queuedLines.emplace_back(std::move(line->value())); + } + else { throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } - ingestLogLine(dbpool->get().get(), line); - } - else { - throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - std::filesystem::remove(path); } unsigned int diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 2ae2936..94e0d5c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -24,6 +24,7 @@ namespace WebStat { std::filesystem::path fallbackDir = "/var/log/webstat"; unsigned int dbMax = 4; unsigned int dbKeep = 2; + size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -36,6 +37,7 @@ namespace WebStat { class Ingestor { public: + using LineBatch = std::vector<std::string>; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -50,9 +52,9 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void ingestLogLine(std::string_view); - void ingestLogLine(DB::Connection *, std::string_view); - void parkLogLine(std::string_view); + void tryIngestQueuedLogLines(); + void ingestLogLines(DB::Connection *, const LineBatch & lines); + void parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); @@ -70,7 +72,8 @@ namespace WebStat { size_t linesParsed = 0; size_t linesDiscarded = 0; size_t linesParked = 0; - mutable std::flat_set<Crc32Value> existingEntities; + std::flat_set<Crc32Value> existingEntities; + LineBatch queuedLines; bool terminated = false; @@ -98,8 +101,8 @@ namespace WebStat { void onNewUserAgent(const Entity &) const; void handleCurlOperations(); - void jobIngestParkedLine(const std::filesystem::directory_iterator &); - void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size); + void jobIngestParkedLines(const std::filesystem::path &); + void jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 6d3aeda..39d794a 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -53,6 +53,8 @@ main(int argc, char ** argv) "Maximum number of concurrent write/read write DB connections") ("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep), "Number of write/read write DB connections to keep open") + ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize), + "Maximum number of access log entries to hold in memory before writing them to the DB") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp index 69212de..c403349 100644 --- a/test/perf-ingest.cpp +++ b/test/perf-ingest.cpp @@ -25,6 +25,7 @@ namespace { std::make_shared<WebStat::MockDBPool>("webstat"), { .userAgentAPI = {}, + .maxBatchSize = static_cast<size_t>(state.range(0)), }}; for (auto loop : state) { WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")}; @@ -33,6 +34,6 @@ namespace { } } -BENCHMARK(doIngestFile)->Setup(setup); +BENCHMARK_RANGE(doIngestFile, 1, 1024)->Setup(setup); BENCHMARK_MAIN(); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index be3be56..a1dc5e9 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -230,11 +230,10 @@ BOOST_DATA_TEST_CASE(StoreLogLine, }), line) { - ingestLogLine(DB::MockDatabase::openConnectionTo("webstat").get(), line); + ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}}); BOOST_CHECK_EQUAL(linesRead, 0); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); - BOOST_CHECK_EQUAL(linesParked, 0); BOOST_CHECK_EQUAL(existingEntities.size(), 5); } @@ -264,34 +263,33 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) BOOST_AUTO_TEST_CASE(ParkLogLine) { - parkLogLine(LOGLINE1); - BOOST_CHECK_EQUAL(linesParked, 1); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); const auto path = settings.fallbackDir / LOGLINE1_PARKED; BOOST_TEST_INFO(path); BOOST_REQUIRE(std::filesystem::exists(path)); - BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length()); + BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4); } -BOOST_TEST_DECORATOR(*boost::unit_test::depends_on("I/ParkLogLine")) - -BOOST_AUTO_TEST_CASE(ParkLogLineOnError) +BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) { BOOST_REQUIRE(existingEntities.empty()); constexpr std::string_view LOGLINE_BAD_VERB = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; - BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE_BAD_VERB)); + BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); BOOST_CHECK_EQUAL(linesParked, 0); - BOOST_CHECK_EQUAL(existingEntities.size(), 1); + BOOST_CHECK_EQUAL(linesDiscarded, 1); } BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine")) { - parkLogLine(LOGLINE1); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + queuedLines.emplace_back(LOGLINE1); + queuedLines.emplace_back(LOGLINE2); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); jobIngestParkedLines(); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 2); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -307,18 +305,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); + BOOST_REQUIRE(queuedLines.empty()); + BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; + runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_REQUIRE_EQUAL(linesParked, 1); - BOOST_REQUIRE_EQUAL(linesParsed, 0); + BOOST_CHECK(queuedLines.empty()); + BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; @@ -327,8 +329,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, ingestParkedLines.currentRun->wait(); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(queuedLines.size(), 1); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); } @@ -337,7 +338,8 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges { const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; - parkLogLine(LOGLINE1); + queuedLines.emplace_back(LOGLINE1); + parkQueuedLogLines(); std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); @@ -366,7 +368,8 @@ BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail) BOOST_AUTO_TEST_CASE(DiscardUnparsable) { - BOOST_REQUIRE_NO_THROW(ingestLogLine("does not parse")); + queuedLines.emplace_back("does not parse"); + BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{ |
