diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 72 |
1 files changed, 63 insertions, 9 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -325,7 +325,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -334,6 +334,62 @@ namespace WebStat { }; } + constexpr auto ENTITY_IDS = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); + + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + if (batchSize == 0) { + break; + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -358,10 +414,6 @@ namespace WebStat { void Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { - auto entityIds = std::views::transform([](auto && value) { - return std::make_pair(value->hash, *value->id); - }); - DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { @@ -372,7 +424,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -462,6 +514,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void @@ -558,8 +611,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +693,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void |
