diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-19 12:12:28 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-19 12:12:28 +0100 |
| commit | a3d868ca57591a6d977da8a7e9ec5c14976e73ec (patch) | |
| tree | 803085077c0a4ad148359738b985e117dff75c42 | |
| parent | 29f458117184af5b1507cac01b48b41bfbad568a (diff) | |
| parent | cabd894779c2d2e453c8ed7771fdbd17c881bc2c (diff) | |
| download | webstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.tar.bz2 webstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.tar.xz webstat-a3d868ca57591a6d977da8a7e9ec5c14976e73ec.zip | |
| -rw-r--r-- | src/ingestor.cpp | 72 | ||||
| -rw-r--r-- | src/ingestor.hpp | 6 | ||||
| -rw-r--r-- | src/schema.sql | 4 | ||||
| -rw-r--r-- | src/sql.cpp | 17 | ||||
| -rw-r--r-- | src/sql.hpp | 4 | ||||
| -rw-r--r-- | src/sql/deleteEntity.sql | 2 | ||||
| -rw-r--r-- | src/sql/markEntityRetried.sql | 6 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 11 | ||||
| -rw-r--r-- | src/sql/setEntityType.sql | 6 | ||||
| -rw-r--r-- | src/util.hpp | 49 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 105 |
12 files changed, 269 insertions, 15 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..954f872 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -325,7 +325,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -334,6 +334,62 @@ namespace WebStat { }; } + constexpr auto ENTITY_IDS = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); + + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + lineSelect->bindParamI(0, settings.maxBatchSize); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + while (!terminated) { + unsigned int batchSize = 0; + DB::TransactionScope batchTx {*dbconn}; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + batchSize += 1; + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + if (batchSize == 0) { + break; + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -358,10 +414,6 @@ namespace WebStat { void Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines) { - auto entityIds = std::views::transform([](auto && value) { - return std::make_pair(value->hash, *value->id); - }); - DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { @@ -372,7 +424,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -462,6 +514,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void @@ -558,8 +611,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +693,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 738357b..2050b7c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -28,6 +28,7 @@ namespace WebStat { size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; + minutes freqRetryUninsertableLines = 4h; minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months unsigned int purgeDeleteMax = 10'000; @@ -78,6 +79,7 @@ namespace WebStat { Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); Job::Result jobStoreQueuedLines(); + Job::Result jobRetryUninsertableLines(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -99,7 +101,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::map<EntityHash, EntityId> existingEntities; + ThreadSafeT<std::map<EntityHash, EntityId>> existingEntities; LineBatch queuedLines, processingLines; bool terminated = false; @@ -109,8 +111,8 @@ namespace WebStat { Job ingestParkedLines; Job purgeOldLogs; Job storeQueueLines; + Job retryUninsertableLines; - private: template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); void fillKnownEntities(std::span<Entity *>) const; void storeNewEntities(DB::Connection *, std::span<Entity *>) const; diff --git a/src/schema.sql b/src/schema.sql index e09be28..0d9fcb4 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -42,6 +42,10 @@ CREATE TABLE entities( CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value)); +CREATE INDEX idx_entities_retryinsert ON entities(id) +WHERE + type = 'uninsertable_line' AND detail IS NULL; + CREATE OR REPLACE FUNCTION entity(newValue text, newType entity) RETURNS TABLE( id integer, diff --git a/src/sql.cpp b/src/sql.cpp index 801a905..a2dac02 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -22,6 +22,18 @@ namespace WebStat::SQL { const std::string HOST_UPSERT { #embed "sql/hostUpsert.sql" }; + const std::string SELECT_UNINSERTABLE { +#embed "sql/selectUninsertableLines.sql" + }; + const std::string DELETE_ENTITY { +#embed "sql/deleteEntity.sql" + }; + const std::string MARK_ENTITY_RETRIED { +#embed "sql/markEntityRetried.sql" + }; + const std::string SET_ENTITY_TYPE { +#embed "sql/setEntityType.sql" + }; #define HASH_OPTS(VAR) \ const DB::CommandOptionsPtr VAR##_OPTS \ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false) @@ -30,5 +42,10 @@ namespace WebStat::SQL { HASH_OPTS(ENTITY_INSERT); HASH_OPTS(ENTITY_UPDATE_DETAIL); HASH_OPTS(HOST_UPSERT); + const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true); + HASH_OPTS(DELETE_ENTITY); + HASH_OPTS(MARK_ENTITY_RETRIED); + HASH_OPTS(SET_ENTITY_TYPE); #undef HASH_OPTS } diff --git a/src/sql.hpp b/src/sql.hpp index 1a12823..ae3559a 100644 --- a/src/sql.hpp +++ b/src/sql.hpp @@ -13,5 +13,9 @@ namespace WebStat::SQL { EMBED_DECLARE(ENTITY_INSERT); EMBED_DECLARE(ENTITY_UPDATE_DETAIL); EMBED_DECLARE(HOST_UPSERT); + EMBED_DECLARE(SELECT_UNINSERTABLE); + EMBED_DECLARE(DELETE_ENTITY); + EMBED_DECLARE(MARK_ENTITY_RETRIED); + EMBED_DECLARE(SET_ENTITY_TYPE); #undef EMBED_DECLARE } diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql new file mode 100644 index 0000000..e201384 --- /dev/null +++ b/src/sql/deleteEntity.sql @@ -0,0 +1,2 @@ +DELETE FROM entities +WHERE id = ? diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql new file mode 100644 index 0000000..6ec2263 --- /dev/null +++ b/src/sql/markEntityRetried.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text) +WHERE + id = ? diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql new file mode 100644 index 0000000..5c07791 --- /dev/null +++ b/src/sql/selectUninsertableLines.sql @@ -0,0 +1,11 @@ +SELECT + id, + value +FROM + entities +WHERE + type = 'uninsertable_line' + AND detail IS NULL +ORDER BY + id +LIMIT ? diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql new file mode 100644 index 0000000..5c981b9 --- /dev/null +++ b/src/sql/setEntityType.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + type = ?::entity +WHERE + id = ? diff --git a/src/util.hpp b/src/util.hpp index f7254e8..5cac5a3 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -3,6 +3,7 @@ #include <chrono> #include <command.h> #include <scn/scan.h> +#include <shared_mutex> #include <tuple> namespace WebStat { @@ -95,4 +96,52 @@ namespace WebStat { } return false; } + + template<typename ValueType, typename MutexType = std::shared_mutex> class ThreadSafeT { + public: + template<typename... P> ThreadSafeT(P &&... params) : value {std::forward<P>(params)...} { } + + template<typename LockedValueType, typename LockType> class Locked { + public: + Locked(LockedValueType & valueRef, MutexType & mutex) : value {valueRef}, lock {mutex} { } + + LockedValueType * + operator->() + { + return &value; + } + + private: + LockedValueType & value; + LockType lock; + }; + + Locked<const ValueType, std::shared_lock<MutexType>> + shared() const + { + return {value, mutex}; + } + + Locked<ValueType, std::lock_guard<MutexType>> + unique() + { + return {value, mutex}; + } + + Locked<const ValueType, std::shared_lock<MutexType>> + operator->() const + { + return shared(); + } + + Locked<ValueType, std::lock_guard<MutexType>> + operator()() + { + return unique(); + } + + private: + ValueType value; + mutable MutexType mutex; + }; } diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 1d14532..8dd9f52 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -77,6 +77,8 @@ main(int argc, char ** argv) "How often to check for and import parked log lines") ("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs), "How often to purge old access log entries from the database") + ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines), + "After how long to retry inserting log lines which previously could not be inserted") ("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep), "How many days of access log entries to keep") ("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax), diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 0dad6e6..408e720 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -12,9 +12,52 @@ namespace { using namespace WebStat; BOOST_GLOBAL_FIXTURE(MockDB); + + constexpr std::array<std::string_view, 9> ENTITY_TYPE_NAMES { + "host", + "virtual_host", + "path", + "query_string", + "referrer", + "user_agent", + "unparsable_line", + "uninsertable_line", + "content_type", + }; + + EntityType + toEntityType(const std::string_view typeStr) + { + auto iter = std::ranges::find(ENTITY_TYPE_NAMES, typeStr); + if (iter == ENTITY_TYPE_NAMES.end()) { + throw std::domain_error {std::format("Unknown entity type {}", typeStr)}; + } + return static_cast<EntityType>(iter - ENTITY_TYPE_NAMES.begin()); + } + + using EntityWithDetail = std::tuple<EntityId, EntityType, std::string, std::optional<std::string>>; + + std::optional<EntityWithDetail> + getEntityById(DB::Connection * dbconn, EntityId id) + { + auto select = dbconn->select("SELECT type, value, detail FROM entities WHERE id = ?"); + select->bindParam(0, id); + for (auto [typeStr, value, detail] : select->as<std::string, std::string, std::optional<std::string>>()) { + return std::make_optional<EntityWithDetail>(id, toEntityType(typeStr), std::move(value), std::move(detail)); + } + return std::nullopt; + } } namespace std { + ostream & + operator<<(ostream & strm, const EntityType value) + { + const auto valueNum = static_cast<size_t>(value); + std::print(strm, "EntityType: {} ({})", ENTITY_TYPE_NAMES[valueNum], valueNum); + return strm; + } + template<typename T> ostream & operator<<(ostream & strm, const std::optional<T> & value) @@ -264,7 +307,7 @@ BOOST_DATA_TEST_CASE(StoreLogLine, BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); BOOST_CHECK_EQUAL(stats.logsInserted, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 5); - BOOST_CHECK_EQUAL(existingEntities.size(), 5); + BOOST_CHECK_EQUAL(existingEntities->size(), 5); } BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) @@ -277,7 +320,7 @@ BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) BOOST_CHECK_EQUAL(stats.linesParsed, 10); BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); BOOST_CHECK_GE(stats.entitiesInserted, 1); - BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities.size()); + BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities->size()); } BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) @@ -306,7 +349,7 @@ BOOST_AUTO_TEST_CASE(ParkLogLine) BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) { - BOOST_REQUIRE(existingEntities.empty()); + BOOST_REQUIRE(existingEntities->empty()); constexpr std::string_view LOGLINE_BAD_VERB = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); @@ -418,7 +461,7 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); - BOOST_CHECK(existingEntities.empty()); // Don't clutter existing entities with junk logs + BOOST_CHECK(existingEntities->empty()); // Don't clutter existing entities with junk logs } BOOST_AUTO_TEST_CASE(PurgeOldJob) @@ -426,6 +469,60 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob) BOOST_CHECK_EQUAL(2, jobPurgeOldLogs()()); } +BOOST_AUTO_TEST_CASE(RetryUninsertableNone) +{ + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableSuccess) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE1}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + BOOST_REQUIRE(getEntityById(dbconn.get(), *uninsertable.id)); + + BOOST_CHECK_EQUAL(1, jobRetryUninsertableLines()()); + BOOST_REQUIRE(!getEntityById(dbconn.get(), *uninsertable.id)); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableNowUnparsable) +{ + auto dbconn = dbpool->get(); + Entity uninsertable {{}, {}, EntityType::UninsertableLine, "blah"}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UnparsableLine); +} + +BOOST_AUTO_TEST_CASE(RetryUninsertableStillUninsertable) +{ + auto dbconn = dbpool->get(); + constexpr std::string_view LOGLINE_UNINSERTABLE + = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "text/plain")LOG"; + Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE_UNINSERTABLE}; + storeNewEntity(dbconn.get(), uninsertable); + BOOST_REQUIRE(uninsertable.id); + + BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()()); + auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id); + BOOST_REQUIRE(updatedEntity); + BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UninsertableLine); + const auto & detail = std::get<3>(*updatedEntity); + BOOST_REQUIRE(detail); + + BOOST_TEST_CONTEXT(*detail) { + BOOST_CHECK(detail->starts_with("{")); + BOOST_CHECK(detail->contains("invalid input value for enum http_verb")); + BOOST_CHECK(detail->contains("retriedAt")); + BOOST_CHECK(detail->ends_with("}")); + } +} + BOOST_AUTO_TEST_CASE(LogStatsSignal) { BOOST_REQUIRE_EQUAL(logsWritten, 0); |
