diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-18 20:43:51 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-18 20:43:51 +0100 |
| commit | e1a6654bd5e284842ffbc3b93bd390f3bad7a187 (patch) | |
| tree | a06a68ec6741e3311b74ee347ed8eb2002dba9fa /src/ingestor.cpp | |
| parent | e8a0fb7b9c61d5603a86b52e0f2144a91fd1d84e (diff) | |
| download | webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.bz2 webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.xz webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.zip | |
Add job to retry insertion of log lines which had previously failed
Entities are reparsed and reinserted, removed on success.
Failure to parse updates the entity type to UnparsableLine.
Failure to insert again updates the detail with the reason.
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 46 |
1 files changed, 45 insertions, 1 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9f30263..d9437c9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -338,6 +338,49 @@ namespace WebStat { return std::make_pair(value->hash, *value->id); }); + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -462,6 +505,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void |
