diff options
| -rw-r--r-- | src/ingestor.cpp | 49 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 3 |
2 files changed, 32 insertions, 20 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index d9437c9..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -344,36 +344,45 @@ namespace WebStat { auto dbh = dbpool->get(); auto dbconn = dbh.get(); auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); setEntityUnparsable->bindParamS(0, "unparsable_line"); unsigned int stored = 0; - for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { - try { - DB::TransactionScope lineTx {*dbconn}; - if (auto result = scanLogLine(line)) { - auto values = hashScanValues(result->values()); - auto valuesEntities = entities(values); - fillKnownEntities(valuesEntities); - storeNewEntities(dbconn, valuesEntities); - existingEntities()->insert_range(valuesEntities | ENTITY_IDS); - storeLogLine(dbconn, values); + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); - deleteLine->bindParamI(0, id); - deleteLine->execute(); - stored += 1; + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } } - else { - // unparseable - was parsable previously, isn't now 🤷 - setEntityUnparsable->bindParamI(1, id); - setEntityUnparsable->execute(); + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); } } - catch (const std::exception & err) { - bindMany(markLineRetried, 0, err.what(), id); - markLineRetried->execute(); + if (batchSize == 0) { + break; } } return [stored]() { diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql index 048726b..5c07791 100644 --- a/src/sql/selectUninsertableLines.sql +++ b/src/sql/selectUninsertableLines.sql @@ -6,3 +6,6 @@ FROM WHERE type = 'uninsertable_line' AND detail IS NULL +ORDER BY + id +LIMIT ? |
