diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-19 00:36:50 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-19 00:36:50 +0100 |
| commit | b298237a297cb70640c85c016b702a35ecafd9b9 (patch) | |
| tree | e046995df39879d545be04a113cf344c449565ba | |
| parent | e1a6654bd5e284842ffbc3b93bd390f3bad7a187 (diff) | |
| download | webstat-b298237a297cb70640c85c016b702a35ecafd9b9.tar.bz2 webstat-b298237a297cb70640c85c016b702a35ecafd9b9.tar.xz webstat-b298237a297cb70640c85c016b702a35ecafd9b9.zip | |
Run the retry uninsertable process batched
Standard sized batch in a transaction, ordered by entity id.
Includes early exit if terminated.
| -rw-r--r-- | src/ingestor.cpp | 49 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 3 |
2 files changed, 32 insertions, 20 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index d9437c9..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -344,36 +344,45 @@ namespace WebStat { auto dbh = dbpool->get(); auto dbconn = dbh.get(); auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); setEntityUnparsable->bindParamS(0, "unparsable_line"); unsigned int stored = 0; - for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { - try { - DB::TransactionScope lineTx {*dbconn}; - if (auto result = scanLogLine(line)) { - auto values = hashScanValues(result->values()); - auto valuesEntities = entities(values); - fillKnownEntities(valuesEntities); - storeNewEntities(dbconn, valuesEntities); - existingEntities()->insert_range(valuesEntities | ENTITY_IDS); - storeLogLine(dbconn, values); + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); - deleteLine->bindParamI(0, id); - deleteLine->execute(); - stored += 1; + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } } - else { - // unparseable - was parsable previously, isn't now 🤷 - setEntityUnparsable->bindParamI(1, id); - setEntityUnparsable->execute(); + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); } } - catch (const std::exception & err) { - bindMany(markLineRetried, 0, err.what(), id); - markLineRetried->execute(); + if (batchSize == 0) { + break; } } return [stored]() { diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql index 048726b..5c07791 100644 --- a/src/sql/selectUninsertableLines.sql +++ b/src/sql/selectUninsertableLines.sql @@ -6,3 +6,6 @@ FROM WHERE type = 'uninsertable_line' AND detail IS NULL +ORDER BY + id +LIMIT ? |
