summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-05-19 00:36:50 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-05-19 00:36:50 +0100
commitb298237a297cb70640c85c016b702a35ecafd9b9 (patch)
treee046995df39879d545be04a113cf344c449565ba
parente1a6654bd5e284842ffbc3b93bd390f3bad7a187 (diff)
downloadwebstat-b298237a297cb70640c85c016b702a35ecafd9b9.tar.bz2
webstat-b298237a297cb70640c85c016b702a35ecafd9b9.tar.xz
webstat-b298237a297cb70640c85c016b702a35ecafd9b9.zip
Run the retry uninsertable process batched
Standard sized batch in a transaction, ordered by entity id. Includes early exit if terminated.
-rw-r--r--src/ingestor.cpp49
-rw-r--r--src/sql/selectUninsertableLines.sql3
2 files changed, 32 insertions, 20 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index d9437c9..954f872 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -344,36 +344,45 @@ namespace WebStat {
auto dbh = dbpool->get();
auto dbconn = dbh.get();
auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS);
+ lineSelect->bindParamI(0, settings.maxBatchSize);
auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS);
auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS);
auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS);
setEntityUnparsable->bindParamS(0, "unparsable_line");
unsigned int stored = 0;
- for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
- try {
- DB::TransactionScope lineTx {*dbconn};
- if (auto result = scanLogLine(line)) {
- auto values = hashScanValues(result->values());
- auto valuesEntities = entities(values);
- fillKnownEntities(valuesEntities);
- storeNewEntities(dbconn, valuesEntities);
- existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
- storeLogLine(dbconn, values);
+ while (!terminated) {
+ unsigned int batchSize = 0;
+ DB::TransactionScope batchTx {*dbconn};
+ for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
+ batchSize += 1;
+ try {
+ DB::TransactionScope lineTx {*dbconn};
+ if (auto result = scanLogLine(line)) {
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
+ storeLogLine(dbconn, values);
- deleteLine->bindParamI(0, id);
- deleteLine->execute();
- stored += 1;
+ deleteLine->bindParamI(0, id);
+ deleteLine->execute();
+ stored += 1;
+ }
+ else {
+ // unparseable - was parsable previously, isn't now 🤷
+ setEntityUnparsable->bindParamI(1, id);
+ setEntityUnparsable->execute();
+ }
}
- else {
- // unparseable - was parsable previously, isn't now 🤷
- setEntityUnparsable->bindParamI(1, id);
- setEntityUnparsable->execute();
+ catch (const std::exception & err) {
+ bindMany(markLineRetried, 0, err.what(), id);
+ markLineRetried->execute();
}
}
- catch (const std::exception & err) {
- bindMany(markLineRetried, 0, err.what(), id);
- markLineRetried->execute();
+ if (batchSize == 0) {
+ break;
}
}
return [stored]() {
diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql
index 048726b..5c07791 100644
--- a/src/sql/selectUninsertableLines.sql
+++ b/src/sql/selectUninsertableLines.sql
@@ -6,3 +6,6 @@ FROM
WHERE
type = 'uninsertable_line'
AND detail IS NULL
+ORDER BY
+ id
+LIMIT ?