diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-04-18 01:00:42 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-04-18 01:00:42 +0100 |
| commit | 7b148411b127ebf6fdefdb1b0decd2886cdfc17b (patch) | |
| tree | 56385615fe86dbb5ab7ce4a90d696d97ddf21d7e | |
| parent | 1e551e618a63c869fde6a4b327566b38696a5f45 (diff) | |
| parent | fa6074eaf52be4254c17b74f20193aa96c940df8 (diff) | |
| download | webstat-0.4.tar.bz2 webstat-0.4.tar.xz webstat-0.4.zip | |
Merge remote-tracking branch 'origin/no-crc32'HEADwebstat-0.4main
| -rw-r--r-- | Jamroot.jam | 2 | ||||
| -rw-r--r-- | src/Jamfile.jam | 2 | ||||
| -rw-r--r-- | src/ingestor.cpp | 204 | ||||
| -rw-r--r-- | src/ingestor.hpp | 18 | ||||
| -rw-r--r-- | src/logTypes.hpp | 12 | ||||
| -rw-r--r-- | src/schema.sql | 71 | ||||
| -rw-r--r-- | src/sql.cpp | 4 | ||||
| -rw-r--r-- | src/sql/entityInsert.sql | 6 | ||||
| -rw-r--r-- | src/sql/hostUpsert.sql | 14 | ||||
| -rw-r--r-- | src/uaLookup.cpp | 4 | ||||
| -rw-r--r-- | src/uaLookup.hpp | 6 | ||||
| -rw-r--r-- | src/util.hpp | 2 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 40 |
13 files changed, 241 insertions, 144 deletions
diff --git a/Jamroot.jam b/Jamroot.jam index 2f38c94..37f97a6 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -12,7 +12,7 @@ lib boost_po : : <link>shared <name>boost_program_options ; lib adhocutil : : <link>shared : : <include>/usr/include/adhocutil <library>glibmm ; lib dbppcore : : <link>shared : : <include>/usr/include/dbpp <library>adhocutil ; lib dbpp-postgresql : : <link>shared : : <include>/usr/include/dbpp-postgresql <library>dbppcore ; -lib z : : <link>shared ; +lib md : : <link>shared ; project webstat : requirements <cxxstd>26 diff --git a/src/Jamfile.jam b/src/Jamfile.jam index 9459dd8..debe757 100644 --- a/src/Jamfile.jam +++ b/src/Jamfile.jam @@ -3,7 +3,7 @@ lib webstat : [ glob *.cpp : *_main.cpp ] : <library>..//adhocutil <library>..//dbppcore <library>../thirdparty//scn - <library>..//z + <library>..//md <library>..//curl : : <include>. diff --git a/src/ingestor.cpp b/src/ingestor.cpp index c5cb8d8..33af8cf 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include <modifycommand.h> #include <ranges> #include <scn/scan.h> +#include <selectcommand.h> #include <syslog.h> #include <utility> #include <zlib.h> @@ -18,25 +19,48 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, entity.id); } } namespace WebStat { namespace { - Crc32Value - crc32(const std::string_view value) + using ByteArrayView = std::span<const uint8_t>; + + auto + bytesToHexRange(const ByteArrayView bytes) + { + constexpr auto HEXN = 16ZU; + return bytes | std::views::transform([](auto byte) { + return std::array {byte / HEXN, byte % HEXN}; + }) | std::views::join + | std::views::transform([](auto nibble) { + return "0123456789abcdef"[nibble]; + }); + } + + EntityHash + makeHash(const std::string_view value) { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()), - static_cast<uInt>(value.length()))); + MD5_CTX ctx {}; + MD5Init(&ctx); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes + MD5Update(&ctx, reinterpret_cast<const uint8_t *>(value.data()), value.length()); + EntityHash hash {}; + MD5Final(hash.data(), &ctx); + return hash; } template<EntityType Type> struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return { + .hash = makeHash(value), + .id = std::nullopt, + .type = Type, + .value = value, + }; } template<typename T> @@ -50,7 +74,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity, ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity, @@ -65,6 +89,19 @@ namespace WebStat { }(std::make_index_sequence<VALUE_COUNT>()); } + template<std::integral T = EntityId, typename... Binds> + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward<Binds>(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -80,14 +117,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -210,7 +243,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -230,34 +263,58 @@ namespace WebStat { } } + template<typename... T> + std::vector<Entity *> + Ingestor::entities(std::tuple<T...> & values) + { + std::vector<Entity *> entities; + visit( + [&entities]<typename V>(V & value) { + static_assert(!std::is_const_v<V>); + if constexpr (std::is_same_v<V, Entity>) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v<V, std::optional<Entity>>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(value->hash, *value->id); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { - DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); + std::optional<DB::TransactionScope> lineTx; + if (!std::ranges::all_of(valuesEntities, &std::optional<EntityId>::has_value, &Entity::id)) { + lineTx.emplace(*dbconn); + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); } storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get<Crc32Value>(uninsertableLine), line.c_str()); + *uninsertableLine.id, line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -267,22 +324,21 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), - line.c_str()); - storeEntities(dbconn, {unparsableLine}); + auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str()); } } } - void + std::expected<std::filesystem::path, int> Ingestor::parkQueuedLogLines() { if (queuedLines.empty()) { - return; + return std::unexpected(0); } - const std::filesystem::path path { - settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))}; + const std::filesystem::path path {settings.fallbackDir + / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))}; if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { fprintf(parked.get(), "%zu\n", queuedLines.size()); for (const auto & line : queuedLines) { @@ -290,16 +346,19 @@ namespace WebStat { } if (fflush(parked.get()) == 0) { queuedLines.clear(); - auto finalPath = auto {path}.replace_extension(".log"); + auto finalPath = std::filesystem::path {path}.replace_extension(".log"); + parked.reset(); if (rename(path.c_str(), finalPath.c_str()) == 0) { - return; + return finalPath; } } } + const int err = errno; log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size()); for (const auto & line : queuedLines) { log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data()); } + return std::unexpected(err); } void @@ -334,7 +393,7 @@ namespace WebStat { unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { - if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) { + if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { jobIngestParkedLines(pathIter->path()); count += 1; } @@ -397,34 +456,29 @@ namespace WebStat { return purgedTotal; } - template<typename... T> - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple<T...> & values) const + void + Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next]<typename X>(const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + entity->id = existing->second; + } + } + } - if constexpr (std::is_same_v<X, Entity>) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v<X, std::optional<Entity>>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const + { + for (const auto entity : entities) { + if (!entity->id) { + storeNewEntity(dbconn, *entity); + assert(entity->id); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9> ENTITY_TYPE_VALUES {{ @@ -439,28 +493,20 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + assert(!entity.id); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)]; + entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index fcddc92..195f325 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -7,7 +7,8 @@ #include <connectionPool.h> #include <connection_fwd.h> #include <cstdio> -#include <flat_set> +#include <expected> +#include <flat_map> #include <future> #include <scn/scan.h> #include <span> @@ -54,7 +55,7 @@ namespace WebStat { void ingestLog(std::FILE *); void tryIngestQueuedLogLines(); void ingestLogLines(DB::Connection *, const LineBatch & lines); - void parkQueuedLogLines(); + std::expected<std::filesystem::path, int> parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); @@ -78,7 +79,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::flat_set<Crc32Value> existingEntities; + std::flat_map<EntityHash, EntityId> existingEntities; LineBatch queuedLines; bool terminated = false; @@ -99,11 +100,10 @@ namespace WebStat { Job purgeOldLogs; private: - static constexpr size_t MAX_NEW_ENTITIES = 6; - using NewEntityIds = std::array<std::optional<Crc32Value>, MAX_NEW_ENTITIES>; - NewEntityIds storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const; - using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>; - template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const; + template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); + void fillKnownEntities(std::span<Entity *>) const; + void storeNewEntities(DB::Connection *, std::span<Entity *>) const; + void storeNewEntity(DB::Connection *, Entity &) const; void onNewUserAgent(const Entity &) const; void handleCurlOperations(); void logStats() const; @@ -119,7 +119,7 @@ namespace WebStat { [[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0; using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>; - uint32_t hostnameId; + EntityId hostnameId; CurlMultiPtr curl; mutable CurlOperations curlOperations; std::thread::id mainThread; diff --git a/src/logTypes.hpp b/src/logTypes.hpp index 71393b2..7f0473e 100644 --- a/src/logTypes.hpp +++ b/src/logTypes.hpp @@ -1,5 +1,6 @@ #pragma once +#include <md5.h> #include <optional> #include <scn/scan.h> #include <string> @@ -34,8 +35,15 @@ namespace WebStat { ContentType, }; - using Crc32Value = uint32_t; - using Entity = std::tuple<Crc32Value, EntityType, std::string_view>; + using EntityId = int32_t; + using EntityHash = std::array<uint8_t, MD5_DIGEST_LENGTH>; + + struct Entity { + EntityHash hash; + std::optional<EntityId> id; + EntityType type; + std::string_view value; + }; } namespace scn { diff --git a/src/schema.sql b/src/schema.sql index 789789a..b211505 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM( ); CREATE TABLE entities( - id oid NOT NULL, + id integer GENERATED ALWAYS AS IDENTITY, value text NOT NULL, type entity NOT NULL, detail jsonb, @@ -42,30 +42,69 @@ CREATE TABLE entities( CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value)); +CREATE OR REPLACE FUNCTION entity(newValue text, newType entity) + RETURNS integer + AS $$ +DECLARE + now timestamp without time zone; + recid integer; +BEGIN + IF newValue IS NULL THEN + RETURN NULL; + END IF; + INSERT INTO entities(value, type) + SELECT + newValue, + newType + WHERE + NOT EXISTS ( + SELECT + FROM + entities + WHERE + md5(value) = md5(newValue)) + ON CONFLICT + DO NOTHING + RETURNING + id INTO recid; + IF recid IS NULL THEN + SELECT + id INTO recid + FROM + entities + WHERE + md5(value) = md5(newValue); + END IF; + RETURN recid; +END; +$$ +LANGUAGE plpgSQL +RETURNS NULL ON NULL INPUT; + CREATE TABLE access_log( id bigint GENERATED ALWAYS AS IDENTITY, - hostname oid NOT NULL, - virtual_host oid NOT NULL, + hostname integer NOT NULL, + virtual_host integer NOT NULL, remoteip inet NOT NULL, request_time timestamp(6) NOT NULL, method http_verb NOT NULL, protocol protocol NOT NULL, - path oid NOT NULL, - query_string oid, + path integer NOT NULL, + query_string integer, status smallint NOT NULL, - size int NOT NULL, + size integer NOT NULL, duration interval second(6) NOT NULL, - referrer oid, - user_agent oid, - content_type oid, + referrer integer, + user_agent integer, + content_type integer, CONSTRAINT pk_access_log PRIMARY KEY (id), - CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id), - CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id), - CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id), - CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id), - CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id), - CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id), - CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) + CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) ON UPDATE CASCADE ); CREATE OR REPLACE VIEW access_log_view AS diff --git a/src/sql.cpp b/src/sql.cpp index da95f18..801a905 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -1,5 +1,6 @@ #include "sql.hpp" #include <command.h> +#include <dbpp-postgresql/pq-command.h> namespace WebStat::SQL { // ccache doesn't play nicely with #embed @@ -22,7 +23,8 @@ namespace WebStat::SQL { #embed "sql/hostUpsert.sql" }; #define HASH_OPTS(VAR) \ - const DB::CommandOptionsPtr VAR##_OPTS = std::make_shared<DB::CommandOptions>(std::hash<std::string> {}(VAR)) + const DB::CommandOptionsPtr VAR##_OPTS \ + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false) HASH_OPTS(ACCESS_LOG_INSERT); HASH_OPTS(ACCESS_LOG_PURGE_OLD); HASH_OPTS(ENTITY_INSERT); diff --git a/src/sql/entityInsert.sql b/src/sql/entityInsert.sql index 8e25810..f518617 100644 --- a/src/sql/entityInsert.sql +++ b/src/sql/entityInsert.sql @@ -1,4 +1,2 @@ -INSERT INTO entities(id, type, value) - VALUES (?, ?, ?) -ON CONFLICT - DO NOTHING +SELECT + entity(?, ?) diff --git a/src/sql/hostUpsert.sql b/src/sql/hostUpsert.sql index 18e8df8..d5ee11d 100644 --- a/src/sql/hostUpsert.sql +++ b/src/sql/hostUpsert.sql @@ -1,7 +1,9 @@ -INSERT INTO entities(id, type, value, detail) - VALUES ($1, 'host', $2, jsonb_build_object('sysname', $3::text, 'release', $4::text, - 'version', $5::text, 'machine', $6::text, 'domainname', $7::text)) -ON CONFLICT ON CONSTRAINT pk_entities +INSERT INTO entities(type, value, detail) + VALUES ('host', $1, jsonb_build_object('sysname', $2::text, 'release', $3::text, + 'version', $4::text, 'machine', $5::text, 'domainname', $6::text)) +ON CONFLICT (md5(value)) DO UPDATE SET - detail = jsonb_build_object('sysname', $3::text, 'release', $4::text, 'version', - $5::text, 'machine', $6::text, 'domainname', $7::text) + detail = jsonb_build_object('sysname', $2::text, 'release', $3::text, 'version', + $4::text, 'machine', $5::text, 'domainname', $6::text) + RETURNING + id diff --git a/src/uaLookup.cpp b/src/uaLookup.cpp index 8cd1cb2..0fee3f0 100644 --- a/src/uaLookup.cpp +++ b/src/uaLookup.cpp @@ -5,7 +5,7 @@ #include <modifycommand.h> namespace WebStat { - UserAgentLookupOperation::UserAgentLookupOperation(Crc32Value userAgentEntityId) : entityId {userAgentEntityId} { } + UserAgentLookupOperation::UserAgentLookupOperation(EntityId userAgentEntityId) : entityId {userAgentEntityId} { } void UserAgentLookupOperation::whenComplete(DB::Connection * dbconn) const @@ -16,7 +16,7 @@ namespace WebStat { } std::unique_ptr<CurlOperation> - curlGetUserAgentDetail(Crc32Value entityId, const std::string_view uas, const char * baseUrl) + curlGetUserAgentDetail(EntityId entityId, const std::string_view uas, const char * baseUrl) { auto request = std::make_unique<UserAgentLookupOperation>(entityId); diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp index 9714253..4de883f 100644 --- a/src/uaLookup.hpp +++ b/src/uaLookup.hpp @@ -9,13 +9,13 @@ namespace WebStat { class UserAgentLookupOperation : public CurlOperation { public: - UserAgentLookupOperation(Crc32Value entityId); + UserAgentLookupOperation(EntityId entityId); void whenComplete(DB::Connection *) const override; - Crc32Value entityId; + EntityId entityId; }; std::unique_ptr<CurlOperation> curlGetUserAgentDetail( - Crc32Value entityId, std::string_view uas, const char * baseUrl); + EntityId entityId, std::string_view uas, const char * baseUrl); } diff --git a/src/util.hpp b/src/util.hpp index 28bcebd..f7254e8 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -16,7 +16,7 @@ namespace WebStat { template<typename... T> auto - visit(auto && visitor, const std::tuple<T...> & values) + visit(auto && visitor, std::tuple<T...> & values) { std::apply( [&](auto &&... value) { diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 5fc8195..88e80e8 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -155,7 +155,6 @@ BOOST_DATA_TEST_CASE(CLFStringsBad, constexpr std::string_view LOGLINE1 = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 GET "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "test/plain")LOG"; -constexpr std::string_view LOGLINE1_PARKED = "parked-237093379.log"; constexpr std::string_view LOGLINE2 = R"LOG(www.randomdan.homeip.net 43.128.84.166 1755561575973204 GET "/app-dicts/myspell-et/Manifest" "" HTTP/1.1 200 312 10369 "https://google.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36" "image/png")LOG"; @@ -298,11 +297,11 @@ BOOST_AUTO_TEST_CASE(ParkLogLine) { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - parkQueuedLogLines(); - const auto path = settings.fallbackDir / LOGLINE1_PARKED; - BOOST_TEST_INFO(path); - BOOST_REQUIRE(std::filesystem::exists(path)); - BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); + BOOST_TEST_INFO(*path); + BOOST_REQUIRE(std::filesystem::exists(*path)); + BOOST_CHECK_EQUAL(std::filesystem::file_size(*path), LOGLINE1.length() + LOGLINE2.length() + 4); } BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) @@ -318,11 +317,12 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" { queuedLines.emplace_back(LOGLINE1); queuedLines.emplace_back(LOGLINE2); - parkQueuedLogLines(); + BOOST_REQUIRE(parkQueuedLogLines()); + BOOST_CHECK(!std::filesystem::is_empty(settings.fallbackDir)); BOOST_REQUIRE(queuedLines.empty()); jobIngestParkedLines(); BOOST_CHECK_EQUAL(queuedLines.size(), 2); - BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::is_empty(settings.fallbackDir)); } BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs) @@ -338,21 +338,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - 1s; queuedLines.emplace_back(LOGLINE1); - parkQueuedLogLines(); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); BOOST_REQUIRE(queuedLines.empty()); - BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_REQUIRE(std::filesystem::exists(*path)); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(queuedLines.empty()); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s; runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK(queuedLines.empty()); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; @@ -363,7 +364,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob, BOOST_REQUIRE(!ingestParkedLines.currentRun); BOOST_CHECK_EQUAL(queuedLines.size(), 1); BOOST_CHECK_GE(ingestParkedLines.lastRun, now); - BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(!std::filesystem::exists(*path)); } BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/IngestParkedJob")) @@ -371,14 +372,15 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges const auto now = Job::LastRunTime::clock::now(); ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s; queuedLines.emplace_back(LOGLINE1); - parkQueuedLogLines(); - std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write); + const auto path = parkQueuedLogLines(); + BOOST_REQUIRE(path); + std::filesystem::permissions(*path, std::filesystem::perms::owner_write); runJobsAsNeeded(); BOOST_REQUIRE(ingestParkedLines.currentRun); ingestParkedLines.currentRun->wait(); runJobsAsNeeded(); BOOST_REQUIRE(!ingestParkedLines.currentRun); - BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED)); + BOOST_CHECK(std::filesystem::exists(*path)); BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s); BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s); } @@ -404,10 +406,10 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); - constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{ - {1664299262, "does not parse"}, + constexpr std::array<std::tuple<EntityId, std::string_view>, 1> EXPECTED {{ + {18, "does not parse"}, }}; - auto rows = select->as<Crc32Value, std::string_view>(); + auto rows = select->as<EntityId, std::string_view>(); BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); |
