diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-18 20:43:51 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-18 20:43:51 +0100 |
| commit | e1a6654bd5e284842ffbc3b93bd390f3bad7a187 (patch) | |
| tree | a06a68ec6741e3311b74ee347ed8eb2002dba9fa | |
| parent | e8a0fb7b9c61d5603a86b52e0f2144a91fd1d84e (diff) | |
| download | webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.bz2 webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.xz webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.zip | |
Add job to retry insertion of log lines which had previously failed
Entities are reparsed and reinserted, removed on success.
Failure to parse updates the entity type to UnparsableLine.
Failure to insert again updates the detail with the reason.
| -rw-r--r-- | src/ingestor.cpp | 46 | ||||
| -rw-r--r-- | src/ingestor.hpp | 4 | ||||
| -rw-r--r-- | src/sql.cpp | 17 | ||||
| -rw-r--r-- | src/sql.hpp | 4 | ||||
| -rw-r--r-- | src/sql/deleteEntity.sql | 2 | ||||
| -rw-r--r-- | src/sql/markEntityRetried.sql | 6 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 8 | ||||
| -rw-r--r-- | src/sql/setEntityType.sql | 6 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 97 |
10 files changed, 190 insertions, 2 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9f30263..d9437c9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -338,6 +338,49 @@ namespace WebStat { return std::make_pair(value->hash, *value->id); }); + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -462,6 +505,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index c2a47a4..2050b7c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -28,6 +28,7 @@ namespace WebStat { size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; + minutes freqRetryUninsertableLines = 4h; minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months unsigned int purgeDeleteMax = 10'000; @@ -78,6 +79,7 @@ namespace WebStat { Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); Job::Result jobStoreQueuedLines(); + Job::Result jobRetryUninsertableLines(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -109,8 +111,8 @@ namespace WebStat { Job ingestParkedLines; Job purgeOldLogs; Job storeQueueLines; + Job retryUninsertableLines; - private: template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); void fillKnownEntities(std::span<Entity *>) const; void storeNewEntities(DB::Connection *, std::span<Entity *>) const; diff --git a/src/sql.cpp b/src/sql.cpp index 801a905..a2dac02 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -22,6 +22,18 @@ namespace WebStat::SQL { const std::string HOST_UPSERT { #embed "sql/hostUpsert.sql" }; + const std::string SELECT_UNINSERTABLE { +#embed "sql/selectUninsertableLines.sql" + }; + const std::string DELETE_ENTITY { +#embed "sql/deleteEntity.sql" + }; + const std::string MARK_ENTITY_RETRIED { +#embed "sql/markEntityRetried.sql" + }; + const std::string SET_ENTITY_TYPE { +#embed "sql/setEntityType.sql" + }; #define HASH_OPTS(VAR) \ const DB::CommandOptionsPtr VAR##_OPTS \ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false) @@ -30,5 +42,10 @@ namespace WebStat::SQL { HASH_OPTS(ENTITY_INSERT); HASH_OPTS(ENTITY_UPDATE_DETAIL); HASH_OPTS(HOST_UPSERT); + const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true); + HASH_OPTS(DELETE_ENTITY); + HASH_OPTS(MARK_ENTITY_RETRIED); + HASH_OPTS(SET_ENTITY_TYPE); #undef HASH_OPTS } diff --git a/src/sql.hpp b/src/sql.hpp index 1a12823..ae3559a 100644 --- a/src/sql.hpp +++ b/src/sql.hpp @@ -13,5 +13,9 @@ namespace WebStat::SQL { EMBED_DECLARE(ENTITY_INSERT); EMBED_DECLARE(ENTITY_UPDATE_DETAIL); EMBED_DECLARE(HOST_UPSERT); + EMBED_DECLARE(SELECT_UNINSERTABLE); + EMBED_DECLARE(DELETE_ENTITY); + EMBED_DECLARE(MARK_ENTITY_RETRIED); + EMBED_DECLARE(SET_ENTITY_TYPE); #undef EMBED_DECLARE } diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql new file mode 100644 index 0000000..e201384 --- /dev/null +++ b/src/sql/deleteEntity.sql @@ -0,0 +1,2 @@ +DELETE FROM entities +WHERE id = ? diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql new file mode 100644 index 0000000..6ec2263 --- /dev/null +++ b/src/sql/markEntityRetried.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text) +WHERE + id = ? diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql new file mode 100644 index 0000000..048726b --- /dev/null +++ b/src/sql/selectUninsertableLines.sql @@ -0,0 +1,8 @@ +SELECT + id, + value +FROM + entities +WHERE + type = 'uninsertable_line' + AND detail IS NULL diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql new file mode 100644 index 0000000..5c981b9 --- /dev/null +++ b/src/sql/setEntityType.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + type = ?::entity +WHERE + id = ? diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 1d14532..8dd9f52 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -77,6 +77,8 @@ main(int argc, char ** argv) "How often to check for and import parked log lines") ("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs), "How often to purge old access log entries from the database") + ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines), + "After how long to retry inserting log lines which previously could not be inserted") ("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep), "How many days of access log entries to keep") ("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax), diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index c2ac4b3..408e720 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -12,9 +12,52 @@ namespace { using namespace WebStat; BOOST_GLOBAL_FIXTURE(MockDB); + + constexpr std::array<std::string_view, 9> ENTITY_TYPE_NAMES { + "host", + "virtual_host", + "path", + "query_string", + "referrer", + "user_agent", + "unparsable_line", + "uninsertable_line", + "content_type", + }; + + EntityType + toEntityType(const std::string_view typeStr) + { + auto iter = std::ranges::find(ENTITY_TYPE_NAMES, typeStr); + if (iter == ENTITY_TYPE_NAMES.end()) { + throw std::domain_error {std::format("Unknown entity type {}", typeStr)}; + } + return static_cast<EntityType>(iter - ENTITY_TYPE_NAMES.begin()); + } + + using EntityWithDetail = std::tuple<EntityId, EntityType, std::string, std::optional<std::string>>; + + std::optional<EntityWithDetail> + getEntityById(DB::Connection * dbconn, EntityId id) + { + auto select = dbconn->select("SELECT type, value, detail FROM entities WHERE id = ?"); + select->bindParam(0, id); + for (auto [typeStr, value, detail] : select->as<std::string, std::string, std::optional<std::string>>()) { + return std::make_optional<EntityWithDetail>(id, toEntityType(typeStr), std::move(value), std::move(detail)); + } + return std::nullopt; + } } namespace std { + ostream & + operator<<(ostream & strm, const EntityType value) + { + const auto valueNum = static_cast<size_t>(value); + std::print(strm, "EntityType: {} ({})", ENTITY_TYPE_NAMES[valueNum], valueNum); + return strm; + } + template<typename T> ostream & operator<<(ostream & strm, const std::optional<T> & value) @@ -426,6 +469,60 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob) BOOST_CHECK_EQUAL(2, jobPurgeOldLogs()()); } +BOOST_AUTO_TEST_CASE(RetryUninsertableNone) +{ + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableSuccess) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE1}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + BOOST_REQUIRE(getEntityById(dbconn.get(), *uninsertable.id)); + + BOOST_CHECK_EQUAL(1, jobRetryUninsertableLines()()); + BOOST_REQUIRE(!getEntityById(dbconn.get(), *uninsertable.id)); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableNowUnparsable) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, "blah"}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UnparsableLine); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableStillUninsertable) +{ + auto dbconn = dbpool->get(); + constexpr std::string_view LOGLINE_UNINSERTABLE + = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "text/plain")LOG"; + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE_UNINSERTABLE}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UninsertableLine); + const auto & detail = std::get<3>(*updatedEntity); + BOOST_REQUIRE(detail); + + BOOST_TEST_CONTEXT(*detail) { + BOOST_CHECK(detail->starts_with("{")); + BOOST_CHECK(detail->contains("invalid input value for enum http_verb")); + BOOST_CHECK(detail->contains("retriedAt")); + BOOST_CHECK(detail->ends_with("}")); + } +} + BOOST_AUTO_TEST_CASE(LogStatsSignal) { BOOST_REQUIRE_EQUAL(logsWritten, 0); |
