From 5b2166496e5f3ff2c4276e0b5b28f109c70673d5 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Wed, 15 Apr 2026 12:03:21 +0100 Subject: Replace use of crc32 for entity id Entity value is MD5 hashed same as DB unique key, but the id itself is now taken from the DB primary key which is sequence generated. --- src/ingestor.cpp | 158 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 84 insertions(+), 74 deletions(-) (limited to 'src/ingestor.cpp') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5ae2487..5454087 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -18,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, std::get<1>(entity)); } } @@ -50,19 +51,11 @@ namespace WebStat { return hash; } - Crc32Value - crc32(const std::string_view value) - { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast(value.data()), - static_cast(value.length()))); - } - template struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return {makeHash(value), std::nullopt, Type, value}; } template @@ -76,7 +69,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple, std::identity, std::identity, std::identity, ToEntity, ToEntity, std::identity, std::identity, @@ -91,6 +84,19 @@ namespace WebStat { }(std::make_index_sequence()); } + template + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -106,14 +112,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -236,7 +238,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -256,34 +258,55 @@ namespace WebStat { } } + template + std::vector + Ingestor::entities(std::tuple & values) + { + std::vector entities; + visit( + [&entities](V & value) { + static_assert(!std::is_const_v); + if constexpr (std::is_same_v) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); - } + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get(uninsertableLine), line.c_str()); + *std::get<1>(uninsertableLine), line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -293,10 +316,10 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get(unparsableLine), + auto unparsableLine = ToEntity {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), line.c_str()); - storeEntities(dbconn, {unparsableLine}); } } } @@ -426,34 +449,28 @@ namespace WebStat { return purgedTotal; } - template - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple & values) const + void + Ingestor::fillKnownEntities(const std::span entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next](const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { + std::get<1>(*entity) = existing->second; + } + } + } - if constexpr (std::is_same_v) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span entities) const + { + for (const auto entity : entities) { + if (!std::get<1>(*entity)) { + storeNewEntity(dbconn, *entity); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array, 9> ENTITY_TYPE_VALUES {{ @@ -468,28 +485,21 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + auto & [entityHash, entityId, type, value] = entity; + assert(!entityId); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; + entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } -- cgit v1.3