summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
authorDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-19 12:12:28 +0100
committerDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-19 12:12:28 +0100
commita3d868ca57591a6d977da8a7e9ec5c14976e73ec (patch)
tree803085077c0a4ad148359738b985e117dff75c42 /src/ingestor.cpp
parent29f458117184af5b1507cac01b48b41bfbad568a (diff)
parentcabd894779c2d2e453c8ed7771fdbd17c881bc2c (diff)
downloadwebstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.tar.bz2
webstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.tar.xz
webstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.zip
Merge remote-tracking branch 'origin/retry-store'
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp72
1 files changed, 63 insertions, 9 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 3e6e307..954f872 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -134,7 +134,7 @@ namespace WebStat {
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- storeQueueLines {&Ingestor::jobStoreQueuedLines},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}
@@ -325,7 +325,7 @@ namespace WebStat {
}
catch (const std::exception & excp) {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
- existingEntities.clear();
+ existingEntities()->clear();
}
auto count = std::distance(processingLines.begin(), storedEnd);
processingLines.erase(processingLines.begin(), storedEnd);
@@ -334,6 +334,62 @@ namespace WebStat {
};
}
+ constexpr auto ENTITY_IDS = std::views::transform([](auto && value) {
+ return std::make_pair(value->hash, *value->id);
+ });
+
+ Ingestor::Job::Result
+ Ingestor::jobRetryUninsertableLines()
+ {
+ auto dbh = dbpool->get();
+ auto dbconn = dbh.get();
+ auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS);
+ lineSelect->bindParamI(0, settings.maxBatchSize);
+ auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS);
+ auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS);
+ auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS);
+ setEntityUnparsable->bindParamS(0, "unparsable_line");
+
+ unsigned int stored = 0;
+ while (!terminated) {
+ unsigned int batchSize = 0;
+ DB::TransactionScope batchTx {*dbconn};
+ for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
+ batchSize += 1;
+ try {
+ DB::TransactionScope lineTx {*dbconn};
+ if (auto result = scanLogLine(line)) {
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
+ storeLogLine(dbconn, values);
+
+ deleteLine->bindParamI(0, id);
+ deleteLine->execute();
+ stored += 1;
+ }
+ else {
+ // unparseable - was parsable previously, isn't now 🤷
+ setEntityUnparsable->bindParamI(1, id);
+ setEntityUnparsable->execute();
+ }
+ }
+ catch (const std::exception & err) {
+ bindMany(markLineRetried, 0, err.what(), id);
+ markLineRetried->execute();
+ }
+ }
+ if (batchSize == 0) {
+ break;
+ }
+ }
+ return [stored]() {
+ return stored;
+ };
+ }
+
template<typename... T>
std::vector<Entity *>
Ingestor::entities(std::tuple<T...> & values)
@@ -358,10 +414,6 @@ namespace WebStat {
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines)
{
- auto entityIds = std::views::transform([](auto && value) {
- return std::make_pair(value->hash, *value->id);
- });
-
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
@@ -372,7 +424,7 @@ namespace WebStat {
try {
DB::TransactionScope lineTx {*dbconn};
storeNewEntities(dbconn, valuesEntities);
- existingEntities.insert_range(valuesEntities | entityIds);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
@@ -462,6 +514,7 @@ namespace WebStat {
runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1});
runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs);
}
void
@@ -558,8 +611,9 @@ namespace WebStat {
void
Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
+ auto lockedEntities = existingEntities.shared();
for (const auto entity : entities) {
- if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) {
+ if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) {
entity->id = existing->second;
}
}
@@ -639,7 +693,7 @@ namespace WebStat {
"Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, "
"entitiesInserted %zu, entitiesKnown %zu",
queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted,
- stats.entitiesInserted, existingEntities.size());
+ stats.entitiesInserted, existingEntities->size());
}
void