diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ingestor.cpp | 131 | ||||
| -rw-r--r-- | src/ingestor.hpp | 15 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 |
3 files changed, 88 insertions, 60 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2018ad..1de9ab9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,6 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> -#include <fstream> #include <modifycommand.h> #include <ranges> #include <scn/scan.h> @@ -92,6 +91,7 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() @@ -171,12 +171,18 @@ namespace WebStat { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + queuedLines.emplace_back(std::move(line->value())); + if (queuedLines.size() >= settings.maxBatchSize) { + tryIngestQueuedLogLines(); + } } else { break; } } + else { + tryIngestQueuedLogLines(); + } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } @@ -184,68 +190,84 @@ namespace WebStat { handleCurlOperations(); } } + tryIngestQueuedLogLines(); + parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::tryIngestQueuedLogLines() { try { - ingestLogLine(dbpool->get().get(), line); + ingestLogLines(dbpool->get().get(), queuedLines); + queuedLines.clear(); } catch (const std::exception &) { - parkLogLine(line); + existingEntities.clear(); } } void - Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto rememberNewEntityIds = [this](const auto & ids) { - existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - })); - }; - if (auto result = scanLogLine(line)) { - linesParsed++; - const auto values = crc32ScanValues(result->values()); - NewEntityIds ids; - try { - { - std::optional<DB::TransactionScope> dbtx; + auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) + | std::views::transform([](auto && value) { + return *value; + }); + + DB::TransactionScope batchTx {*dbconn}; + for (const auto & line : lines) { + if (auto result = scanLogLine(line)) { + linesParsed++; + const auto values = crc32ScanValues(result->values()); + try { + DB::TransactionScope dbtx {*dbconn}; if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - ids = storeEntities(dbconn, newEnts); + existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); } storeLogLine(dbconn, values); } - rememberNewEntityIds(ids); - } - catch (const DB::Error & originalError) { - try { - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine})); - } - catch (const std::exception &) { - throw originalError; + catch (const DB::Error & originalError) { + try { + DB::TransactionScope dbtx {*dbconn}; + const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeEntities(dbconn, {uninsertableLine}); + } + catch (const std::exception &) { + throw originalError; + } } } - } - else { - linesDiscarded++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); + else { + linesDiscarded++; + const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeEntities(dbconn, {unparsableLine}); + } } } void - Ingestor::parkLogLine(std::string_view line) + Ingestor::parkQueuedLogLines() { - std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line; - linesParked++; + if (queuedLines.empty()) { + return; + } + std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))}; + if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { + fprintf(parked.get(), "%zu\n", queuedLines.size()); + for (const auto & line : queuedLines) { + fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + } + if (fflush(parked.get()) == 0) { + linesParked += queuedLines.size(); + queuedLines.clear(); + } + else { + std::filesystem::remove(path); + } + } } void @@ -280,7 +302,7 @@ namespace WebStat { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { - jobIngestParkedLine(pathIter); + jobIngestParkedLines(pathIter->path()); count += 1; } } @@ -288,29 +310,30 @@ namespace WebStat { } void - Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter) + Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { - jobIngestParkedLine(pathIter->path(), pathIter->file_size()); + if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { + if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) { + jobIngestParkedLines(parked.get(), count->value()); + std::filesystem::remove(path); + return; + } + } + throw std::system_error {errno, std::generic_category(), strerror(errno)}; } void - Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size) + Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { - if (std::ifstream parked {path}) { - std::string line; - line.resize_and_overwrite(size, [&parked](char * content, size_t size) { - parked.read(content, static_cast<std::streamsize>(size)); - return static_cast<size_t>(parked.tellg()); - }); - if (line.length() < size) { + for (size_t line = 0; line < count; ++line) { + if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { + linesRead++; + queuedLines.emplace_back(std::move(line->value())); + } + else { throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } - ingestLogLine(dbpool->get().get(), line); - } - else { - throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - std::filesystem::remove(path); } unsigned int diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 2ae2936..94e0d5c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -24,6 +24,7 @@ namespace WebStat { std::filesystem::path fallbackDir = "/var/log/webstat"; unsigned int dbMax = 4; unsigned int dbKeep = 2; + size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -36,6 +37,7 @@ namespace WebStat { class Ingestor { public: + using LineBatch = std::vector<std::string>; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -50,9 +52,9 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void ingestLogLine(std::string_view); - void ingestLogLine(DB::Connection *, std::string_view); - void parkLogLine(std::string_view); + void tryIngestQueuedLogLines(); + void ingestLogLines(DB::Connection *, const LineBatch & lines); + void parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); @@ -70,7 +72,8 @@ namespace WebStat { size_t linesParsed = 0; size_t linesDiscarded = 0; size_t linesParked = 0; - mutable std::flat_set<Crc32Value> existingEntities; + std::flat_set<Crc32Value> existingEntities; + LineBatch queuedLines; bool terminated = false; @@ -98,8 +101,8 @@ namespace WebStat { void onNewUserAgent(const Entity &) const; void handleCurlOperations(); - void jobIngestParkedLine(const std::filesystem::directory_iterator &); - void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size); + void jobIngestParkedLines(const std::filesystem::path &); + void jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 6d3aeda..39d794a 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -53,6 +53,8 @@ main(int argc, char ** argv) "Maximum number of concurrent write/read write DB connections") ("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep), "Number of write/read write DB connections to keep open") + ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize), + "Maximum number of access log entries to hold in memory before writing them to the DB") ("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir), "Path to write access logs to when the database is unavailable") ("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter), |
