diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-04-15 12:03:21 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-04-15 12:05:20 +0100 |
| commit | 5b2166496e5f3ff2c4276e0b5b28f109c70673d5 (patch) | |
| tree | 93c351f6ef7d3d60d60247d5dbf531496f14a06c /src/ingestor.cpp | |
| parent | 3ce6cf305572709332d7329674ec45c987a093ad (diff) | |
| download | webstat-5b2166496e5f3ff2c4276e0b5b28f109c70673d5.tar.bz2 webstat-5b2166496e5f3ff2c4276e0b5b28f109c70673d5.tar.xz webstat-5b2166496e5f3ff2c4276e0b5b28f109c70673d5.zip | |
Replace use of crc32 for entity id
Entity value is MD5 hashed same as DB unique key, but the id itself is
now taken from the DB primary key which is sequence generated.
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 158 |
1 files changed, 84 insertions, 74 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5ae2487..5454087 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include <modifycommand.h> #include <ranges> #include <scn/scan.h> +#include <selectcommand.h> #include <syslog.h> #include <utility> #include <zlib.h> @@ -18,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, std::get<1>(entity)); } } @@ -50,19 +51,11 @@ namespace WebStat { return hash; } - Crc32Value - crc32(const std::string_view value) - { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()), - static_cast<uInt>(value.length()))); - } - template<EntityType Type> struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return {makeHash(value), std::nullopt, Type, value}; } template<typename T> @@ -76,7 +69,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity, ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity, @@ -91,6 +84,19 @@ namespace WebStat { }(std::make_index_sequence<VALUE_COUNT>()); } + template<std::integral T = EntityId, typename... Binds> + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward<Binds>(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -106,14 +112,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -236,7 +238,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -256,34 +258,55 @@ namespace WebStat { } } + template<typename... T> + std::vector<Entity *> + Ingestor::entities(std::tuple<T...> & values) + { + std::vector<Entity *> entities; + visit( + [&entities]<typename V>(V & value) { + static_assert(!std::is_const_v<V>); + if constexpr (std::is_same_v<V, Entity>) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v<V, std::optional<Entity>>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); - } + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get<Crc32Value>(uninsertableLine), line.c_str()); + *std::get<1>(uninsertableLine), line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -293,10 +316,10 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), + auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), line.c_str()); - storeEntities(dbconn, {unparsableLine}); } } } @@ -426,34 +449,28 @@ namespace WebStat { return purgedTotal; } - template<typename... T> - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple<T...> & values) const + void + Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next]<typename X>(const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { + std::get<1>(*entity) = existing->second; + } + } + } - if constexpr (std::is_same_v<X, Entity>) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v<X, std::optional<Entity>>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const + { + for (const auto entity : entities) { + if (!std::get<1>(*entity)) { + storeNewEntity(dbconn, *entity); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9> ENTITY_TYPE_VALUES {{ @@ -468,28 +485,21 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + auto & [entityHash, entityId, type, value] = entity; + assert(!entityId); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; + entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } |
