From e1a6654bd5e284842ffbc3b93bd390f3bad7a187 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Mon, 18 May 2026 20:43:51 +0100 Subject: Add job to retry insertion of log lines which had previously failed Entities are reparsed and reinserted, removed on success. Failure to parse updates the entity type to UnparsableLine. Failure to insert again updates the detail with the reason. --- src/sql/selectUninsertableLines.sql | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 src/sql/selectUninsertableLines.sql (limited to 'src/sql/selectUninsertableLines.sql') diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql new file mode 100644 index 0000000..048726b --- /dev/null +++ b/src/sql/selectUninsertableLines.sql @@ -0,0 +1,8 @@ +SELECT + id, + value +FROM + entities +WHERE + type = 'uninsertable_line' + AND detail IS NULL -- cgit v1.3 From b298237a297cb70640c85c016b702a35ecafd9b9 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Tue, 19 May 2026 00:36:50 +0100 Subject: Run the retry uninsertable process batched Standard sized batch in a transaction, ordered by entity id. Includes early exit if terminated. --- src/ingestor.cpp | 51 ++++++++++++++++++++++--------------- src/sql/selectUninsertableLines.sql | 3 +++ 2 files changed, 33 insertions(+), 21 deletions(-) (limited to 'src/sql/selectUninsertableLines.sql') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index d9437c9..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -344,36 +344,45 @@ namespace WebStat { auto dbh = dbpool->get(); auto dbconn = dbh.get(); auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); setEntityUnparsable->bindParamS(0, "unparsable_line"); unsigned int stored = 0; - for (auto [id, line] : lineSelect->as()) { - try { - DB::TransactionScope lineTx {*dbconn}; - if (auto result = scanLogLine(line)) { - auto values = hashScanValues(result->values()); - auto valuesEntities = entities(values); - fillKnownEntities(valuesEntities); - storeNewEntities(dbconn, valuesEntities); - existingEntities()->insert_range(valuesEntities | ENTITY_IDS); - storeLogLine(dbconn, values); - - deleteLine->bindParamI(0, id); - deleteLine->execute(); - stored += 1; + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } } - else { - // unparseable - was parsable previously, isn't now 🤷 - setEntityUnparsable->bindParamI(1, id); - setEntityUnparsable->execute(); + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); } } - catch (const std::exception & err) { - bindMany(markLineRetried, 0, err.what(), id); - markLineRetried->execute(); + if (batchSize == 0) { + break; } } return [stored]() { diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql index 048726b..5c07791 100644 --- a/src/sql/selectUninsertableLines.sql +++ b/src/sql/selectUninsertableLines.sql @@ -6,3 +6,6 @@ FROM WHERE type = 'uninsertable_line' AND detail IS NULL +ORDER BY + id +LIMIT ? -- cgit v1.3