summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-05-18 20:43:51 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-05-18 20:43:51 +0100
commite1a6654bd5e284842ffbc3b93bd390f3bad7a187 (patch)
treea06a68ec6741e3311b74ee347ed8eb2002dba9fa /src/ingestor.cpp
parente8a0fb7b9c61d5603a86b52e0f2144a91fd1d84e (diff)
downloadwebstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.bz2
webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.xz
webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.zip
Add job to retry insertion of log lines which had previously failed
Entities are reparsed and reinserted, removed on success. Failure to parse updates the entity type to UnparsableLine. Failure to insert again updates the detail with the reason.
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp46
1 files changed, 45 insertions, 1 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 9f30263..d9437c9 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -134,7 +134,7 @@ namespace WebStat {
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- storeQueueLines {&Ingestor::jobStoreQueuedLines},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}
@@ -338,6 +338,49 @@ namespace WebStat {
return std::make_pair(value->hash, *value->id);
});
+ Ingestor::Job::Result
+ Ingestor::jobRetryUninsertableLines()
+ {
+ auto dbh = dbpool->get();
+ auto dbconn = dbh.get();
+ auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS);
+ auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS);
+ auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS);
+ auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS);
+ setEntityUnparsable->bindParamS(0, "unparsable_line");
+
+ unsigned int stored = 0;
+ for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
+ try {
+ DB::TransactionScope lineTx {*dbconn};
+ if (auto result = scanLogLine(line)) {
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
+ storeLogLine(dbconn, values);
+
+ deleteLine->bindParamI(0, id);
+ deleteLine->execute();
+ stored += 1;
+ }
+ else {
+ // unparseable - was parsable previously, isn't now 🤷
+ setEntityUnparsable->bindParamI(1, id);
+ setEntityUnparsable->execute();
+ }
+ }
+ catch (const std::exception & err) {
+ bindMany(markLineRetried, 0, err.what(), id);
+ markLineRetried->execute();
+ }
+ }
+ return [stored]() {
+ return stored;
+ };
+ }
+
template<typename... T>
std::vector<Entity *>
Ingestor::entities(std::tuple<T...> & values)
@@ -462,6 +505,7 @@ namespace WebStat {
runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1});
runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs);
}
void