From e1a6654bd5e284842ffbc3b93bd390f3bad7a187 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Mon, 18 May 2026 20:43:51 +0100 Subject: Add job to retry insertion of log lines which had previously failed Entities are reparsed and reinserted, removed on success. Failure to parse updates the entity type to UnparsableLine. Failure to insert again updates the detail with the reason. --- src/ingestor.cpp | 46 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9f30263..d9437c9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -338,6 +338,49 @@ namespace WebStat { return std::make_pair(value->hash, *value->id); }); + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + for (auto [id, line] : lineSelect->as()) { + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + return [stored]() { + return stored; + }; + } + template std::vector Ingestor::entities(std::tuple & values) @@ -462,6 +505,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void -- cgit v1.3