diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 113 |
1 files changed, 81 insertions, 32 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..16638eb 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,9 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> +#include <fstream> #include <modifycommand.h> +#include <print> #include <ranges> #include <scn/scan.h> #include <selectcommand.h> @@ -26,19 +28,13 @@ namespace DB { namespace WebStat { namespace { - using ByteArrayView = std::span<const uint8_t>; - - auto - bytesToHexRange(const ByteArrayView bytes) - { + constexpr auto TO_HEX_RANGE = std::views::transform([](auto byte) { constexpr auto HEXN = 16ZU; - return bytes | std::views::transform([](auto byte) { - return std::array {byte / HEXN, byte % HEXN}; - }) | std::views::join - | std::views::transform([](auto nibble) { - return "0123456789abcdef"[nibble]; - }); - } + return std::array {byte / HEXN, byte % HEXN}; + }) | std::views::join + | std::views::transform([](auto nibble) { + return "0123456789abcdef"[nibble]; + }); EntityHash makeHash(const std::string_view value) @@ -134,7 +130,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -325,7 +321,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -334,6 +330,62 @@ namespace WebStat { }; } + constexpr auto ENTITY_IDS = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); + + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + if (batchSize == 0) { + break; + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -358,10 +410,6 @@ namespace WebStat { void Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { - auto entityIds = std::views::transform([](auto && value) { - return std::make_pair(value->hash, *value->id); - }); - DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { @@ -372,7 +420,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -405,18 +453,18 @@ namespace WebStat { if (lines.empty()) { return std::unexpected(0); } - const std::filesystem::path path { - settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))}; - if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { - fprintf(parked.get(), "%zu\n", lines.size()); + const std::filesystem::path path + = settings.fallbackDir / std::format("parked-{:s}.short", makeHash(lines.front()) | TO_HEX_RANGE); + if (std::ofstream parked {path}; parked.good()) { + std::println(parked, "{}", lines.size()); for (const auto & line : lines) { - fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + std::println(parked, "{}", line); } - if (fflush(parked.get()) == 0) { + if (parked.flush().good()) { lines.clear(); - auto finalPath = std::filesystem::path {path}.replace_extension(".log"); - parked.reset(); - if (rename(path.c_str(), finalPath.c_str()) == 0) { + auto finalPath = auto {path}.replace_extension(".log"); + parked.close(); + if (!renameNoExcept(path, finalPath)) { return finalPath; } } @@ -462,6 +510,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void @@ -537,12 +586,11 @@ namespace WebStat { Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); - const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, std::format("{} days", settings.purgeDaysToKeep)); purge->bindParam(1, settings.purgeDeleteMax); unsigned int purgedTotal {}; - while (!terminated && stopAt > Job::LastRunTime::clock::now()) { + while (!terminated) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { @@ -558,8 +606,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +688,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void |
