From 72bfa9dd305258789b0d2e80f8af13962e5aac42 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 10 Apr 2026 20:22:35 +0100 Subject: Return path of parked lines log file from parkQueuedLogLines Or the last errno on failure. --- src/ingestor.cpp | 13 ++++++++----- src/ingestor.hpp | 3 ++- test/test-ingest.cpp | 34 ++++++++++++++++++---------------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index c5cb8d8..0659f1f 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -275,11 +275,11 @@ namespace WebStat { } } - void + std::expected Ingestor::parkQueuedLogLines() { if (queuedLines.empty()) { - return; + return std::unexpected(0); } const std::filesystem::path path { settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))}; @@ -290,16 +290,19 @@ namespace WebStat { } if (fflush(parked.get()) == 0) { queuedLines.clear(); - auto finalPath = auto {path}.replace_extension(".log"); + auto finalPath = std::filesystem::path {path}.replace_extension(".log"); + parked.reset(); if (rename(path.c_str(), finalPath.c_str()) == 0) { - return; + return finalPath; } } } + const int err = errno; log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size()); for (const auto & line : queuedLines) { log(LOG_ERR, "\t%.*s", static_cast(line.length()), line.data()); } + return std::unexpected(err); } void @@ -334,7 +337,7 @@ namespace WebStat { unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { - if (scn::scan(pathIter->path().filename().string(), "parked-{}.log")) { + if (scn::scan(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { jobIngestParkedLines(pathIter->path()); count += 1; } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index fcddc92..b2e0fed 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -54,7 +55,7 @@ namespace WebStat { void ingestLog(std::FILE *); void tryIngestQueuedLogLines(); void ingestLogLines(DB::Connection *, const LineBatch & lines); - void parkQueuedLogLines(); + std::expected parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 5fc8195..d399288 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -155,7 +155,6 @@ BOOST_DATA_TEST_CASE(CLFStringsBad, constexpr std::string_view LOGLINE1 = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 GET "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "test/plain")LOG"; -constexpr std::string_view LOGLINE1_PARKED = "parked-237093379.log"; constexpr std::string_view LOGLINE2 = R"LOG(www.randomdan.homeip.net 43.128.84.166 1755561575973204 GET "/app-dicts/myspell-et/Manifest" "" HTTP/1.1 200 312 10369 "https://google.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36" "image/png")LOG"; @@ -298,11 +297,11 @@ BOOST_AUTO_TEST_CASE(ParkLogLine) { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - parkQueuedLogLines(); - const auto path = settings.fallbackDir / LOGLINE1_PARKED; - BOOST_TEST_INFO(path); - BOOST_REQUIRE(std::filesystem::exists(path)); - BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); + BOOST_TEST_INFO(*path); + BOOST_REQUIRE(std::filesystem::exists(*path)); + BOOST_CHECK_EQUAL(std::filesystem::file_size(*path), LOGLINE1.length() + LOGLINE2.length() + 4); } BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) @@ -318,11 +317,12 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - parkQueuedLogLines(); + BOOST_REQUIRE(parkQueuedLogLines()); + BOOST_CHECK(!std::filesystem::is_empty(settings.fallbackDir)); BOOST_REQUIRE(queuedLines.empty()); jobIngestParkedLines(); BOOST_CHECK_EQUAL(queuedLines.size(), 2); - BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::is_empty(settings.fallbackDir)); } BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs) @@ -338,21 +338,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; queuedLines.emplace_back(LOGLINE1); - parkQueuedLogLines(); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); BOOST_REQUIRE(queuedLines.empty()); - BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_REQUIRE(std::filesystem::exists(*path)); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(queuedLines.empty()); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(queuedLines.empty()); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; @@ -363,7 +364,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK_EQUAL(queuedLines.size(), 1); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); - BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(!std::filesystem::exists(*path)); } BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/IngestParkedJob")) @@ -371,14 +372,15 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; queuedLines.emplace_back(LOGLINE1); - parkQueuedLogLines(); - std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); + std::filesystem::permissions(*path, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); ingestParkedLines.currentRun->wait(); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s); BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s); } -- cgit v1.3 From 3ce6cf305572709332d7329674ec45c987a093ad Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sat, 11 Apr 2026 18:12:51 +0100 Subject: Introduce MD5 from libmd, use it for hashing queuedLines for park path --- Jamroot.jam | 1 + src/Jamfile.jam | 1 + src/ingestor.cpp | 30 ++++++++++++++++++++++++++++-- src/logTypes.hpp | 2 ++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/Jamroot.jam b/Jamroot.jam index 2f38c94..37e4a0e 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -13,6 +13,7 @@ lib adhocutil : : shared : : /usr/include/adhocutil glib lib dbppcore : : shared : : /usr/include/dbpp adhocutil ; lib dbpp-postgresql : : shared : : /usr/include/dbpp-postgresql dbppcore ; lib z : : shared ; +lib md : : shared ; project webstat : requirements 26 diff --git a/src/Jamfile.jam b/src/Jamfile.jam index 9459dd8..eca0c24 100644 --- a/src/Jamfile.jam +++ b/src/Jamfile.jam @@ -4,6 +4,7 @@ lib webstat : [ glob *.cpp : *_main.cpp ] : ..//dbppcore ../thirdparty//scn ..//z + ..//md ..//curl : : . diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 0659f1f..5ae2487 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -24,6 +24,32 @@ namespace DB { namespace WebStat { namespace { + using ByteArrayView = std::span; + + auto + bytesToHexRange(const ByteArrayView bytes) + { + constexpr auto HEXN = 16ZU; + return bytes | std::views::transform([](auto byte) { + return std::array {byte / HEXN, byte % HEXN}; + }) | std::views::join + | std::views::transform([](auto nibble) { + return "0123456789abcdef"[nibble]; + }); + } + + EntityHash + makeHash(const std::string_view value) + { + MD5_CTX ctx {}; + MD5Init(&ctx); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes + MD5Update(&ctx, reinterpret_cast(value.data()), value.length()); + EntityHash hash {}; + MD5Final(hash.data(), &ctx); + return hash; + } + Crc32Value crc32(const std::string_view value) { @@ -281,8 +307,8 @@ namespace WebStat { if (queuedLines.empty()) { return std::unexpected(0); } - const std::filesystem::path path { - settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))}; + const std::filesystem::path path {settings.fallbackDir + / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))}; if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { fprintf(parked.get(), "%zu\n", queuedLines.size()); for (const auto & line : queuedLines) { diff --git a/src/logTypes.hpp b/src/logTypes.hpp index 71393b2..6556d5c 100644 --- a/src/logTypes.hpp +++ b/src/logTypes.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -35,6 +36,7 @@ namespace WebStat { }; using Crc32Value = uint32_t; + using EntityHash = std::array; using Entity = std::tuple; } -- cgit v1.3 From 5b2166496e5f3ff2c4276e0b5b28f109c70673d5 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Wed, 15 Apr 2026 12:03:21 +0100 Subject: Replace use of crc32 for entity id Entity value is MD5 hashed same as DB unique key, but the id itself is now taken from the DB primary key which is sequence generated. --- Jamroot.jam | 1 - src/Jamfile.jam | 1 - src/ingestor.cpp | 158 +++++++++++++++++++++++++---------------------- src/ingestor.hpp | 15 +++-- src/logTypes.hpp | 4 +- src/schema.sql | 69 ++++++++++++++++----- src/sql.cpp | 4 +- src/sql/entityInsert.sql | 6 +- src/sql/hostUpsert.sql | 14 +++-- src/uaLookup.cpp | 4 +- src/uaLookup.hpp | 6 +- src/util.hpp | 2 +- test/test-ingest.cpp | 6 +- 13 files changed, 169 insertions(+), 121 deletions(-) diff --git a/Jamroot.jam b/Jamroot.jam index 37e4a0e..37f97a6 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -12,7 +12,6 @@ lib boost_po : : shared boost_program_options ; lib adhocutil : : shared : : /usr/include/adhocutil glibmm ; lib dbppcore : : shared : : /usr/include/dbpp adhocutil ; lib dbpp-postgresql : : shared : : /usr/include/dbpp-postgresql dbppcore ; -lib z : : shared ; lib md : : shared ; project webstat : requirements diff --git a/src/Jamfile.jam b/src/Jamfile.jam index eca0c24..debe757 100644 --- a/src/Jamfile.jam +++ b/src/Jamfile.jam @@ -3,7 +3,6 @@ lib webstat : [ glob *.cpp : *_main.cpp ] : ..//adhocutil ..//dbppcore ../thirdparty//scn - ..//z ..//md ..//curl : : diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5ae2487..5454087 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -18,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, std::get<1>(entity)); } } @@ -50,19 +51,11 @@ namespace WebStat { return hash; } - Crc32Value - crc32(const std::string_view value) - { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast(value.data()), - static_cast(value.length()))); - } - template struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return {makeHash(value), std::nullopt, Type, value}; } template @@ -76,7 +69,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple, std::identity, std::identity, std::identity, ToEntity, ToEntity, std::identity, std::identity, @@ -91,6 +84,19 @@ namespace WebStat { }(std::make_index_sequence()); } + template + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -106,14 +112,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -236,7 +238,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -256,34 +258,55 @@ namespace WebStat { } } + template + std::vector + Ingestor::entities(std::tuple & values) + { + std::vector entities; + visit( + [&entities](V & value) { + static_assert(!std::is_const_v); + if constexpr (std::is_same_v) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); - } + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get(uninsertableLine), line.c_str()); + *std::get<1>(uninsertableLine), line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -293,10 +316,10 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get(unparsableLine), + auto unparsableLine = ToEntity {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), line.c_str()); - storeEntities(dbconn, {unparsableLine}); } } } @@ -426,34 +449,28 @@ namespace WebStat { return purgedTotal; } - template - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple & values) const + void + Ingestor::fillKnownEntities(const std::span entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next](const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { + std::get<1>(*entity) = existing->second; + } + } + } - if constexpr (std::is_same_v) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span entities) const + { + for (const auto entity : entities) { + if (!std::get<1>(*entity)) { + storeNewEntity(dbconn, *entity); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array, 9> ENTITY_TYPE_VALUES {{ @@ -468,28 +485,21 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + auto & [entityHash, entityId, type, value] = entity; + assert(!entityId); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; + entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index b2e0fed..195f325 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -79,7 +79,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::flat_set existingEntities; + std::flat_map existingEntities; LineBatch queuedLines; bool terminated = false; @@ -100,11 +100,10 @@ namespace WebStat { Job purgeOldLogs; private: - static constexpr size_t MAX_NEW_ENTITIES = 6; - using NewEntityIds = std::array, MAX_NEW_ENTITIES>; - NewEntityIds storeEntities(DB::Connection *, std::span>) const; - using NewEntities = std::array, MAX_NEW_ENTITIES>; - template NewEntities newEntities(const std::tuple &) const; + template static std::vector entities(std::tuple &); + void fillKnownEntities(std::span) const; + void storeNewEntities(DB::Connection *, std::span) const; + void storeNewEntity(DB::Connection *, Entity &) const; void onNewUserAgent(const Entity &) const; void handleCurlOperations(); void logStats() const; @@ -120,7 +119,7 @@ namespace WebStat { [[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0; using CurlOperations = std::map>; - uint32_t hostnameId; + EntityId hostnameId; CurlMultiPtr curl; mutable CurlOperations curlOperations; std::thread::id mainThread; diff --git a/src/logTypes.hpp b/src/logTypes.hpp index 6556d5c..fb66867 100644 --- a/src/logTypes.hpp +++ b/src/logTypes.hpp @@ -35,9 +35,9 @@ namespace WebStat { ContentType, }; - using Crc32Value = uint32_t; + using EntityId = int32_t; using EntityHash = std::array; - using Entity = std::tuple; + using Entity = std::tuple, EntityType, std::string_view>; } namespace scn { diff --git a/src/schema.sql b/src/schema.sql index 789789a..fd9eb4f 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM( ); CREATE TABLE entities( - id oid NOT NULL, + id int GENERATED ALWAYS AS IDENTITY, value text NOT NULL, type entity NOT NULL, detail jsonb, @@ -42,30 +42,69 @@ CREATE TABLE entities( CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value)); +CREATE OR REPLACE FUNCTION entity(newValue text, newType entity) + RETURNS int + AS $$ +DECLARE + now timestamp without time zone; + recid int; +BEGIN + IF newValue IS NULL THEN + RETURN NULL; + END IF; + INSERT INTO entities(value, type) + SELECT + newValue, + newType + WHERE + NOT EXISTS ( + SELECT + FROM + entities + WHERE + md5(value) = md5(newValue)) + ON CONFLICT + DO NOTHING + RETURNING + id INTO recid; + IF recid IS NULL THEN + SELECT + id INTO recid + FROM + entities + WHERE + md5(value) = md5(newValue); + END IF; + RETURN recid; +END; +$$ +LANGUAGE plpgSQL +RETURNS NULL ON NULL INPUT; + CREATE TABLE access_log( id bigint GENERATED ALWAYS AS IDENTITY, - hostname oid NOT NULL, - virtual_host oid NOT NULL, + hostname int NOT NULL, + virtual_host int NOT NULL, remoteip inet NOT NULL, request_time timestamp(6) NOT NULL, method http_verb NOT NULL, protocol protocol NOT NULL, - path oid NOT NULL, - query_string oid, + path int NOT NULL, + query_string int, status smallint NOT NULL, size int NOT NULL, duration interval second(6) NOT NULL, - referrer oid, - user_agent oid, - content_type oid, + referrer int, + user_agent int, + content_type int, CONSTRAINT pk_access_log PRIMARY KEY (id), - CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id), - CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id), - CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id), - CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id), - CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id), - CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id), - CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) + CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) ON UPDATE CASCADE ); CREATE OR REPLACE VIEW access_log_view AS diff --git a/src/sql.cpp b/src/sql.cpp index da95f18..801a905 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -1,5 +1,6 @@ #include "sql.hpp" #include +#include namespace WebStat::SQL { // ccache doesn't play nicely with #embed @@ -22,7 +23,8 @@ namespace WebStat::SQL { #embed "sql/hostUpsert.sql" }; #define HASH_OPTS(VAR) \ - const DB::CommandOptionsPtr VAR##_OPTS = std::make_shared(std::hash {}(VAR)) + const DB::CommandOptionsPtr VAR##_OPTS \ + = std::make_shared(std::hash {}(VAR), 35, false) HASH_OPTS(ACCESS_LOG_INSERT); HASH_OPTS(ACCESS_LOG_PURGE_OLD); HASH_OPTS(ENTITY_INSERT); diff --git a/src/sql/entityInsert.sql b/src/sql/entityInsert.sql index 8e25810..f518617 100644 --- a/src/sql/entityInsert.sql +++ b/src/sql/entityInsert.sql @@ -1,4 +1,2 @@ -INSERT INTO entities(id, type, value) - VALUES (?, ?, ?) -ON CONFLICT - DO NOTHING +SELECT + entity(?, ?) diff --git a/src/sql/hostUpsert.sql b/src/sql/hostUpsert.sql index 18e8df8..d5ee11d 100644 --- a/src/sql/hostUpsert.sql +++ b/src/sql/hostUpsert.sql @@ -1,7 +1,9 @@ -INSERT INTO entities(id, type, value, detail) - VALUES ($1, 'host', $2, jsonb_build_object('sysname', $3::text, 'release', $4::text, - 'version', $5::text, 'machine', $6::text, 'domainname', $7::text)) -ON CONFLICT ON CONSTRAINT pk_entities +INSERT INTO entities(type, value, detail) + VALUES ('host', $1, jsonb_build_object('sysname', $2::text, 'release', $3::text, + 'version', $4::text, 'machine', $5::text, 'domainname', $6::text)) +ON CONFLICT (md5(value)) DO UPDATE SET - detail = jsonb_build_object('sysname', $3::text, 'release', $4::text, 'version', - $5::text, 'machine', $6::text, 'domainname', $7::text) + detail = jsonb_build_object('sysname', $2::text, 'release', $3::text, 'version', + $4::text, 'machine', $5::text, 'domainname', $6::text) + RETURNING + id diff --git a/src/uaLookup.cpp b/src/uaLookup.cpp index 8cd1cb2..0fee3f0 100644 --- a/src/uaLookup.cpp +++ b/src/uaLookup.cpp @@ -5,7 +5,7 @@ #include namespace WebStat { - UserAgentLookupOperation::UserAgentLookupOperation(Crc32Value userAgentEntityId) : entityId {userAgentEntityId} { } + UserAgentLookupOperation::UserAgentLookupOperation(EntityId userAgentEntityId) : entityId {userAgentEntityId} { } void UserAgentLookupOperation::whenComplete(DB::Connection * dbconn) const @@ -16,7 +16,7 @@ namespace WebStat { } std::unique_ptr - curlGetUserAgentDetail(Crc32Value entityId, const std::string_view uas, const char * baseUrl) + curlGetUserAgentDetail(EntityId entityId, const std::string_view uas, const char * baseUrl) { auto request = std::make_unique(entityId); diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp index 9714253..4de883f 100644 --- a/src/uaLookup.hpp +++ b/src/uaLookup.hpp @@ -9,13 +9,13 @@ namespace WebStat { class UserAgentLookupOperation : public CurlOperation { public: - UserAgentLookupOperation(Crc32Value entityId); + UserAgentLookupOperation(EntityId entityId); void whenComplete(DB::Connection *) const override; - Crc32Value entityId; + EntityId entityId; }; std::unique_ptr curlGetUserAgentDetail( - Crc32Value entityId, std::string_view uas, const char * baseUrl); + EntityId entityId, std::string_view uas, const char * baseUrl); } diff --git a/src/util.hpp b/src/util.hpp index 28bcebd..f7254e8 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -16,7 +16,7 @@ namespace WebStat { template auto - visit(auto && visitor, const std::tuple & values) + visit(auto && visitor, std::tuple & values) { std::apply( [&](auto &&... value) { diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index d399288..88e80e8 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -406,10 +406,10 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); - constexpr std::array, 1> EXPECTED {{ - {1664299262, "does not parse"}, + constexpr std::array, 1> EXPECTED {{ + {18, "does not parse"}, }}; - auto rows = select->as(); + auto rows = select->as(); BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); -- cgit v1.3 From 0f7854926477eb8d0971de6c1ea88dd21071e028 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Wed, 15 Apr 2026 12:27:28 +0100 Subject: 4 fields is more than enough for Entity to be a fully-fledged type --- src/ingestor.cpp | 30 +++++++++++++++++------------- src/logTypes.hpp | 8 +++++++- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5454087..04dd378 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -19,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<1>(entity)); + bindParamI(idx, entity.id); } } @@ -55,7 +55,12 @@ namespace WebStat { Entity operator()(const std::string_view value) const { - return {makeHash(value), std::nullopt, Type, value}; + return { + .hash = makeHash(value), + .id = std::nullopt, + .type = Type, + .value = value, + }; } template @@ -283,7 +288,7 @@ namespace WebStat { Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { auto entityIds = std::views::transform([](auto && value) { - return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + return std::make_pair(value->hash, *value->id); }); DB::TransactionScope batchTx {*dbconn}; @@ -306,7 +311,7 @@ namespace WebStat { storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - *std::get<1>(uninsertableLine), line.c_str()); + *uninsertableLine.id, line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -318,8 +323,7 @@ namespace WebStat { stats.linesParseFailed++; auto unparsableLine = ToEntity {}(line); storeNewEntity(dbconn, unparsableLine); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), - line.c_str()); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str()); } } } @@ -453,8 +457,8 @@ namespace WebStat { Ingestor::fillKnownEntities(const std::span entities) const { for (const auto entity : entities) { - if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { - std::get<1>(*entity) = existing->second; + if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + entity->id = existing->second; } } } @@ -463,8 +467,9 @@ namespace WebStat { Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span entities) const { for (const auto entity : entities) { - if (!std::get<1>(*entity)) { + if (!entity->id) { storeNewEntity(dbconn, *entity); + assert(entity->id); } } } @@ -485,10 +490,9 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto & [entityHash, entityId, type, value] = entity; - assert(!entityId); - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + assert(!entity.id); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)]; + entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName); if (onInsert && std::this_thread::get_id() == mainThread) { std::invoke(onInsert, this, entity); } diff --git a/src/logTypes.hpp b/src/logTypes.hpp index fb66867..7f0473e 100644 --- a/src/logTypes.hpp +++ b/src/logTypes.hpp @@ -37,7 +37,13 @@ namespace WebStat { using EntityId = int32_t; using EntityHash = std::array; - using Entity = std::tuple, EntityType, std::string_view>; + + struct Entity { + EntityHash hash; + std::optional id; + EntityType type; + std::string_view value; + }; } namespace scn { -- cgit v1.3 From a6d31ff1d8703eae9375b7ec1cd01b323d7e8e6e Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Wed, 15 Apr 2026 13:57:25 +0100 Subject: Save point only if there are new entities Line insert is only a single operation with no new entities. --- src/ingestor.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 04dd378..33af8cf 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -299,9 +299,12 @@ namespace WebStat { auto valuesEntities = entities(values); fillKnownEntities(valuesEntities); try { - DB::TransactionScope dbtx {*dbconn}; - storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + std::optional lineTx; + if (!std::ranges::all_of(valuesEntities, &std::optional::has_value, &Entity::id)) { + lineTx.emplace(*dbconn); + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); + } storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { -- cgit v1.3 From fa6074eaf52be4254c17b74f20193aa96c940df8 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sat, 18 Apr 2026 00:53:11 +0100 Subject: Swap int for integer in schema Plays better with apgdiff --- src/schema.sql | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/schema.sql b/src/schema.sql index fd9eb4f..b211505 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM( ); CREATE TABLE entities( - id int GENERATED ALWAYS AS IDENTITY, + id integer GENERATED ALWAYS AS IDENTITY, value text NOT NULL, type entity NOT NULL, detail jsonb, @@ -43,11 +43,11 @@ CREATE TABLE entities( CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value)); CREATE OR REPLACE FUNCTION entity(newValue text, newType entity) - RETURNS int + RETURNS integer AS $$ DECLARE now timestamp without time zone; - recid int; + recid integer; BEGIN IF newValue IS NULL THEN RETURN NULL; @@ -83,20 +83,20 @@ RETURNS NULL ON NULL INPUT; CREATE TABLE access_log( id bigint GENERATED ALWAYS AS IDENTITY, - hostname int NOT NULL, - virtual_host int NOT NULL, + hostname integer NOT NULL, + virtual_host integer NOT NULL, remoteip inet NOT NULL, request_time timestamp(6) NOT NULL, method http_verb NOT NULL, protocol protocol NOT NULL, - path int NOT NULL, - query_string int, + path integer NOT NULL, + query_string integer, status smallint NOT NULL, - size int NOT NULL, + size integer NOT NULL, duration interval second(6) NOT NULL, - referrer int, - user_agent int, - content_type int, + referrer integer, + user_agent integer, + content_type integer, CONSTRAINT pk_access_log PRIMARY KEY (id), CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE, CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE, -- cgit v1.3