From 7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 17 May 2026 14:32:11 +0100 Subject: Add ThreadSafeT helper Wraps a templated value and a templated mutex (defaults to shared_mutex) and provides safe access, locked with either a shared_lock (const value) or lock_guard (non-const value). Applies this to existingEntities. --- src/ingestor.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..e8dceb8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -325,7 +325,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -372,7 +372,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -558,8 +558,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +640,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void -- cgit v1.3 From e8a0fb7b9c61d5603a86b52e0f2144a91fd1d84e Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 17 May 2026 17:45:53 +0100 Subject: Extract ENTITY_IDS helper Transform view for getting just the hash and record of a stored entity. --- src/ingestor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index e8dceb8..9f30263 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -334,6 +334,10 @@ namespace WebStat { }; } + constexpr auto ENTITY_IDS = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); + template std::vector Ingestor::entities(std::tuple & values) @@ -358,10 +362,6 @@ namespace WebStat { void Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { - auto entityIds = std::views::transform([](auto && value) { - return std::make_pair(value->hash, *value->id); - }); - DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { @@ -372,7 +372,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities()->insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { -- cgit v1.3 From e1a6654bd5e284842ffbc3b93bd390f3bad7a187 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Mon, 18 May 2026 20:43:51 +0100 Subject: Add job to retry insertion of log lines which had previously failed Entities are reparsed and reinserted, removed on success. Failure to parse updates the entity type to UnparsableLine. Failure to insert again updates the detail with the reason. --- src/ingestor.cpp | 46 +++++++++++++++++- src/ingestor.hpp | 4 +- src/sql.cpp | 17 +++++++ src/sql.hpp | 4 ++ src/sql/deleteEntity.sql | 2 + src/sql/markEntityRetried.sql | 6 +++ src/sql/selectUninsertableLines.sql | 8 +++ src/sql/setEntityType.sql | 6 +++ src/webstat_logger_main.cpp | 2 + test/test-ingest.cpp | 97 +++++++++++++++++++++++++++++++++++++ 10 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 src/sql/deleteEntity.sql create mode 100644 src/sql/markEntityRetried.sql create mode 100644 src/sql/selectUninsertableLines.sql create mode 100644 src/sql/setEntityType.sql (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9f30263..d9437c9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -338,6 +338,49 @@ namespace WebStat { return std::make_pair(value->hash, *value->id); }); + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + for (auto [id, line] : lineSelect->as()) { + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + return [stored]() { + return stored; + }; + } + template std::vector Ingestor::entities(std::tuple & values) @@ -462,6 +505,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index c2a47a4..2050b7c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -28,6 +28,7 @@ namespace WebStat { size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; + minutes freqRetryUninsertableLines = 4h; minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months unsigned int purgeDeleteMax = 10'000; @@ -78,6 +79,7 @@ namespace WebStat { Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); Job::Result jobStoreQueuedLines(); + Job::Result jobRetryUninsertableLines(); template void storeLogLine(DB::Connection *, const std::tuple &) const; @@ -109,8 +111,8 @@ namespace WebStat { Job ingestParkedLines; Job purgeOldLogs; Job storeQueueLines; + Job retryUninsertableLines; - private: template static std::vector entities(std::tuple &); void fillKnownEntities(std::span) const; void storeNewEntities(DB::Connection *, std::span) const; diff --git a/src/sql.cpp b/src/sql.cpp index 801a905..a2dac02 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -22,6 +22,18 @@ namespace WebStat::SQL { const std::string HOST_UPSERT { #embed "sql/hostUpsert.sql" }; + const std::string SELECT_UNINSERTABLE { +#embed "sql/selectUninsertableLines.sql" + }; + const std::string DELETE_ENTITY { +#embed "sql/deleteEntity.sql" + }; + const std::string MARK_ENTITY_RETRIED { +#embed "sql/markEntityRetried.sql" + }; + const std::string SET_ENTITY_TYPE { +#embed "sql/setEntityType.sql" + }; #define HASH_OPTS(VAR) \ const DB::CommandOptionsPtr VAR##_OPTS \ = std::make_shared(std::hash {}(VAR), 35, false) @@ -30,5 +42,10 @@ namespace WebStat::SQL { HASH_OPTS(ENTITY_INSERT); HASH_OPTS(ENTITY_UPDATE_DETAIL); HASH_OPTS(HOST_UPSERT); + const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS + = std::make_shared(std::hash {}(SELECT_UNINSERTABLE), 35, true); + HASH_OPTS(DELETE_ENTITY); + HASH_OPTS(MARK_ENTITY_RETRIED); + HASH_OPTS(SET_ENTITY_TYPE); #undef HASH_OPTS } diff --git a/src/sql.hpp b/src/sql.hpp index 1a12823..ae3559a 100644 --- a/src/sql.hpp +++ b/src/sql.hpp @@ -13,5 +13,9 @@ namespace WebStat::SQL { EMBED_DECLARE(ENTITY_INSERT); EMBED_DECLARE(ENTITY_UPDATE_DETAIL); EMBED_DECLARE(HOST_UPSERT); + EMBED_DECLARE(SELECT_UNINSERTABLE); + EMBED_DECLARE(DELETE_ENTITY); + EMBED_DECLARE(MARK_ENTITY_RETRIED); + EMBED_DECLARE(SET_ENTITY_TYPE); #undef EMBED_DECLARE } diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql new file mode 100644 index 0000000..e201384 --- /dev/null +++ b/src/sql/deleteEntity.sql @@ -0,0 +1,2 @@ +DELETE FROM entities +WHERE id = ? diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql new file mode 100644 index 0000000..6ec2263 --- /dev/null +++ b/src/sql/markEntityRetried.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text) +WHERE + id = ? diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql new file mode 100644 index 0000000..048726b --- /dev/null +++ b/src/sql/selectUninsertableLines.sql @@ -0,0 +1,8 @@ +SELECT + id, + value +FROM + entities +WHERE + type = 'uninsertable_line' + AND detail IS NULL diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql new file mode 100644 index 0000000..5c981b9 --- /dev/null +++ b/src/sql/setEntityType.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + type = ?::entity +WHERE + id = ? diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 1d14532..8dd9f52 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -77,6 +77,8 @@ main(int argc, char ** argv) "How often to check for and import parked log lines") ("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs), "How often to purge old access log entries from the database") + ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines), + "After how long to retry inserting log lines which previously could not be inserted") ("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep), "How many days of access log entries to keep") ("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax), diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index c2ac4b3..408e720 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -12,9 +12,52 @@ namespace { using namespace WebStat; BOOST_GLOBAL_FIXTURE(MockDB); + + constexpr std::array ENTITY_TYPE_NAMES { + "host", + "virtual_host", + "path", + "query_string", + "referrer", + "user_agent", + "unparsable_line", + "uninsertable_line", + "content_type", + }; + + EntityType + toEntityType(const std::string_view typeStr) + { + auto iter = std::ranges::find(ENTITY_TYPE_NAMES, typeStr); + if (iter == ENTITY_TYPE_NAMES.end()) { + throw std::domain_error {std::format("Unknown entity type {}", typeStr)}; + } + return static_cast(iter - ENTITY_TYPE_NAMES.begin()); + } + + using EntityWithDetail = std::tuple>; + + std::optional + getEntityById(DB::Connection * dbconn, EntityId id) + { + auto select = dbconn->select("SELECT type, value, detail FROM entities WHERE id = ?"); + select->bindParam(0, id); + for (auto [typeStr, value, detail] : select->as>()) { + return std::make_optional(id, toEntityType(typeStr), std::move(value), std::move(detail)); + } + return std::nullopt; + } } namespace std { + ostream & + operator<<(ostream & strm, const EntityType value) + { + const auto valueNum = static_cast(value); + std::print(strm, "EntityType: {} ({})", ENTITY_TYPE_NAMES[valueNum], valueNum); + return strm; + } + template ostream & operator<<(ostream & strm, const std::optional & value) @@ -426,6 +469,60 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob) BOOST_CHECK_EQUAL(2, jobPurgeOldLogs()()); } +BOOST_AUTO_TEST_CASE(RetryUninsertableNone) +{ + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableSuccess) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE1}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + BOOST_REQUIRE(getEntityById(dbconn.get(), *uninsertable.id)); + + BOOST_CHECK_EQUAL(1, jobRetryUninsertableLines()()); + BOOST_REQUIRE(!getEntityById(dbconn.get(), *uninsertable.id)); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableNowUnparsable) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, "blah"}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UnparsableLine); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableStillUninsertable) +{ + auto dbconn = dbpool->get(); + constexpr std::string_view LOGLINE_UNINSERTABLE + = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "text/plain")LOG"; + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE_UNINSERTABLE}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UninsertableLine); + const auto & detail = std::get<3>(*updatedEntity); + BOOST_REQUIRE(detail); + + BOOST_TEST_CONTEXT(*detail) { + BOOST_CHECK(detail->starts_with("{")); + BOOST_CHECK(detail->contains("invalid input value for enum http_verb")); + BOOST_CHECK(detail->contains("retriedAt")); + BOOST_CHECK(detail->ends_with("}")); + } +} + BOOST_AUTO_TEST_CASE(LogStatsSignal) { BOOST_REQUIRE_EQUAL(logsWritten, 0); -- cgit v1.3 From b298237a297cb70640c85c016b702a35ecafd9b9 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Tue, 19 May 2026 00:36:50 +0100 Subject: Run the retry uninsertable process batched Standard sized batch in a transaction, ordered by entity id. Includes early exit if terminated. --- src/ingestor.cpp | 51 ++++++++++++++++++++++--------------- src/sql/selectUninsertableLines.sql | 3 +++ 2 files changed, 33 insertions(+), 21 deletions(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index d9437c9..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -344,36 +344,45 @@ namespace WebStat { auto dbh = dbpool->get(); auto dbconn = dbh.get(); auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); setEntityUnparsable->bindParamS(0, "unparsable_line"); unsigned int stored = 0; - for (auto [id, line] : lineSelect->as()) { - try { - DB::TransactionScope lineTx {*dbconn}; - if (auto result = scanLogLine(line)) { - auto values = hashScanValues(result->values()); - auto valuesEntities = entities(values); - fillKnownEntities(valuesEntities); - storeNewEntities(dbconn, valuesEntities); - existingEntities()->insert_range(valuesEntities | ENTITY_IDS); - storeLogLine(dbconn, values); - - deleteLine->bindParamI(0, id); - deleteLine->execute(); - stored += 1; + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } } - else { - // unparseable - was parsable previously, isn't now 🤷 - setEntityUnparsable->bindParamI(1, id); - setEntityUnparsable->execute(); + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); } } - catch (const std::exception & err) { - bindMany(markLineRetried, 0, err.what(), id); - markLineRetried->execute(); + if (batchSize == 0) { + break; } } return [stored]() { diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql index 048726b..5c07791 100644 --- a/src/sql/selectUninsertableLines.sql +++ b/src/sql/selectUninsertableLines.sql @@ -6,3 +6,6 @@ FROM WHERE type = 'uninsertable_line' AND detail IS NULL +ORDER BY + id +LIMIT ? -- cgit v1.3