diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 204 |
1 files changed, 125 insertions, 79 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index c5cb8d8..33af8cf 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include <modifycommand.h> #include <ranges> #include <scn/scan.h> +#include <selectcommand.h> #include <syslog.h> #include <utility> #include <zlib.h> @@ -18,25 +19,48 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, entity.id); } } namespace WebStat { namespace { - Crc32Value - crc32(const std::string_view value) + using ByteArrayView = std::span<const uint8_t>; + + auto + bytesToHexRange(const ByteArrayView bytes) + { + constexpr auto HEXN = 16ZU; + return bytes | std::views::transform([](auto byte) { + return std::array {byte / HEXN, byte % HEXN}; + }) | std::views::join + | std::views::transform([](auto nibble) { + return "0123456789abcdef"[nibble]; + }); + } + + EntityHash + makeHash(const std::string_view value) { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()), - static_cast<uInt>(value.length()))); + MD5_CTX ctx {}; + MD5Init(&ctx); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes + MD5Update(&ctx, reinterpret_cast<const uint8_t *>(value.data()), value.length()); + EntityHash hash {}; + MD5Final(hash.data(), &ctx); + return hash; } template<EntityType Type> struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return { + .hash = makeHash(value), + .id = std::nullopt, + .type = Type, + .value = value, + }; } template<typename T> @@ -50,7 +74,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity, ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity, @@ -65,6 +89,19 @@ namespace WebStat { }(std::make_index_sequence<VALUE_COUNT>()); } + template<std::integral T = EntityId, typename... Binds> + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward<Binds>(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -80,14 +117,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -210,7 +243,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -230,34 +263,58 @@ namespace WebStat { } } + template<typename... T> + std::vector<Entity *> + Ingestor::entities(std::tuple<T...> & values) + { + std::vector<Entity *> entities; + visit( + [&entities]<typename V>(V & value) { + static_assert(!std::is_const_v<V>); + if constexpr (std::is_same_v<V, Entity>) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v<V, std::optional<Entity>>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { - DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); + std::optional<DB::TransactionScope> lineTx; + if (!std::ranges::all_of(valuesEntities, &std::optional<EntityId>::has_value, &Entity::id)) { + lineTx.emplace(*dbconn); + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); } storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get<Crc32Value>(uninsertableLine), line.c_str()); + *uninsertableLine.id, line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -267,22 +324,21 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), - line.c_str()); - storeEntities(dbconn, {unparsableLine}); + auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str()); } } } - void + std::expected<std::filesystem::path, int> Ingestor::parkQueuedLogLines() { if (queuedLines.empty()) { - return; + return std::unexpected(0); } - const std::filesystem::path path { - settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))}; + const std::filesystem::path path {settings.fallbackDir + / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))}; if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { fprintf(parked.get(), "%zu\n", queuedLines.size()); for (const auto & line : queuedLines) { @@ -290,16 +346,19 @@ namespace WebStat { } if (fflush(parked.get()) == 0) { queuedLines.clear(); - auto finalPath = auto {path}.replace_extension(".log"); + auto finalPath = std::filesystem::path {path}.replace_extension(".log"); + parked.reset(); if (rename(path.c_str(), finalPath.c_str()) == 0) { - return; + return finalPath; } } } + const int err = errno; log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size()); for (const auto & line : queuedLines) { log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data()); } + return std::unexpected(err); } void @@ -334,7 +393,7 @@ namespace WebStat { unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { - if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { + if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { jobIngestParkedLines(pathIter->path()); count += 1; } @@ -397,34 +456,29 @@ namespace WebStat { return purgedTotal; } - template<typename... T> - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple<T...> & values) const + void + Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next]<typename X>(const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + entity->id = existing->second; + } + } + } - if constexpr (std::is_same_v<X, Entity>) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v<X, std::optional<Entity>>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const + { + for (const auto entity : entities) { + if (!entity->id) { + storeNewEntity(dbconn, *entity); + assert(entity->id); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9> ENTITY_TYPE_VALUES {{ @@ -439,28 +493,20 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + assert(!entity.id); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)]; + entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } |
