diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 131 |
1 files changed, 77 insertions, 54 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e2018ad..1de9ab9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,6 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> -#include <fstream> #include <modifycommand.h> #include <ranges> #include <scn/scan.h> @@ -92,6 +91,7 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() @@ -171,12 +171,18 @@ namespace WebStat { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + queuedLines.emplace_back(std::move(line->value())); + if (queuedLines.size() >= settings.maxBatchSize) { + tryIngestQueuedLogLines(); + } } else { break; } } + else { + tryIngestQueuedLogLines(); + } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } @@ -184,68 +190,84 @@ namespace WebStat { handleCurlOperations(); } } + tryIngestQueuedLogLines(); + parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::tryIngestQueuedLogLines() { try { - ingestLogLine(dbpool->get().get(), line); + ingestLogLines(dbpool->get().get(), queuedLines); + queuedLines.clear(); } catch (const std::exception &) { - parkLogLine(line); + existingEntities.clear(); } } void - Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) + Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto rememberNewEntityIds = [this](const auto & ids) { - existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - })); - }; - if (auto result = scanLogLine(line)) { - linesParsed++; - const auto values = crc32ScanValues(result->values()); - NewEntityIds ids; - try { - { - std::optional<DB::TransactionScope> dbtx; + auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) + | std::views::transform([](auto && value) { + return *value; + }); + + DB::TransactionScope batchTx {*dbconn}; + for (const auto & line : lines) { + if (auto result = scanLogLine(line)) { + linesParsed++; + const auto values = crc32ScanValues(result->values()); + try { + DB::TransactionScope dbtx {*dbconn}; if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - ids = storeEntities(dbconn, newEnts); + existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); } storeLogLine(dbconn, values); } - rememberNewEntityIds(ids); - } - catch (const DB::Error & originalError) { - try { - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine})); - } - catch (const std::exception &) { - throw originalError; + catch (const DB::Error & originalError) { + try { + DB::TransactionScope dbtx {*dbconn}; + const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeEntities(dbconn, {uninsertableLine}); + } + catch (const std::exception &) { + throw originalError; + } } } - } - else { - linesDiscarded++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); + else { + linesDiscarded++; + const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeEntities(dbconn, {unparsableLine}); + } } } void - Ingestor::parkLogLine(std::string_view line) + Ingestor::parkQueuedLogLines() { - std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line; - linesParked++; + if (queuedLines.empty()) { + return; + } + std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))}; + if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { + fprintf(parked.get(), "%zu\n", queuedLines.size()); + for (const auto & line : queuedLines) { + fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + } + if (fflush(parked.get()) == 0) { + linesParked += queuedLines.size(); + queuedLines.clear(); + } + else { + std::filesystem::remove(path); + } + } } void @@ -280,7 +302,7 @@ namespace WebStat { for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { - jobIngestParkedLine(pathIter); + jobIngestParkedLines(pathIter->path()); count += 1; } } @@ -288,29 +310,30 @@ namespace WebStat { } void - Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter) + Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { - jobIngestParkedLine(pathIter->path(), pathIter->file_size()); + if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { + if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) { + jobIngestParkedLines(parked.get(), count->value()); + std::filesystem::remove(path); + return; + } + } + throw std::system_error {errno, std::generic_category(), strerror(errno)}; } void - Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size) + Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { - if (std::ifstream parked {path}) { - std::string line; - line.resize_and_overwrite(size, [&parked](char * content, size_t size) { - parked.read(content, static_cast<std::streamsize>(size)); - return static_cast<size_t>(parked.tellg()); - }); - if (line.length() < size) { + for (size_t line = 0; line < count; ++line) { + if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { + linesRead++; + queuedLines.emplace_back(std::move(line->value())); + } + else { throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } - ingestLogLine(dbpool->get().get(), line); - } - else { - throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - std::filesystem::remove(path); } unsigned int |
