diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 158 |
1 files changed, 84 insertions, 74 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5ae2487..5454087 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include <modifycommand.h> #include <ranges> #include <scn/scan.h> +#include <selectcommand.h> #include <syslog.h> #include <utility> #include <zlib.h> @@ -18,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, std::get<1>(entity)); } } @@ -50,19 +51,11 @@ namespace WebStat { return hash; } - Crc32Value - crc32(const std::string_view value) - { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()), - static_cast<uInt>(value.length()))); - } - template<EntityType Type> struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return {makeHash(value), std::nullopt, Type, value}; } template<typename T> @@ -76,7 +69,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity, ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity, @@ -91,6 +84,19 @@ namespace WebStat { }(std::make_index_sequence<VALUE_COUNT>()); } + template<std::integral T = EntityId, typename... Binds> + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward<Binds>(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -106,14 +112,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -236,7 +238,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -256,34 +258,55 @@ namespace WebStat { } } + template<typename... T> + std::vector<Entity *> + Ingestor::entities(std::tuple<T...> & values) + { + std::vector<Entity *> entities; + visit( + [&entities]<typename V>(V & value) { + static_assert(!std::is_const_v<V>); + if constexpr (std::is_same_v<V, Entity>) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v<V, std::optional<Entity>>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); - } + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get<Crc32Value>(uninsertableLine), line.c_str()); + *std::get<1>(uninsertableLine), line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -293,10 +316,10 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), + auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), line.c_str()); - storeEntities(dbconn, {unparsableLine}); } } } @@ -426,34 +449,28 @@ namespace WebStat { return purgedTotal; } - template<typename... T> - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple<T...> & values) const + void + Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next]<typename X>(const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { + std::get<1>(*entity) = existing->second; + } + } + } - if constexpr (std::is_same_v<X, Entity>) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v<X, std::optional<Entity>>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const + { + for (const auto entity : entities) { + if (!std::get<1>(*entity)) { + storeNewEntity(dbconn, *entity); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9> ENTITY_TYPE_VALUES {{ @@ -468,28 +485,21 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + auto & [entityHash, entityId, type, value] = entity; + assert(!entityId); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; + entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } |
