diff options
| -rw-r--r-- | Jamroot.jam | 1 | ||||
| -rw-r--r-- | src/Jamfile.jam | 1 | ||||
| -rw-r--r-- | src/ingestor.cpp | 158 | ||||
| -rw-r--r-- | src/ingestor.hpp | 15 | ||||
| -rw-r--r-- | src/logTypes.hpp | 4 | ||||
| -rw-r--r-- | src/schema.sql | 69 | ||||
| -rw-r--r-- | src/sql.cpp | 4 | ||||
| -rw-r--r-- | src/sql/entityInsert.sql | 6 | ||||
| -rw-r--r-- | src/sql/hostUpsert.sql | 14 | ||||
| -rw-r--r-- | src/uaLookup.cpp | 4 | ||||
| -rw-r--r-- | src/uaLookup.hpp | 6 | ||||
| -rw-r--r-- | src/util.hpp | 2 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 6 |
13 files changed, 169 insertions, 121 deletions
diff --git a/Jamroot.jam b/Jamroot.jam index 37e4a0e..37f97a6 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -12,7 +12,6 @@ lib boost_po : : <link>shared <name>boost_program_options ; lib adhocutil : : <link>shared : : <include>/usr/include/adhocutil <library>glibmm ; lib dbppcore : : <link>shared : : <include>/usr/include/dbpp <library>adhocutil ; lib dbpp-postgresql : : <link>shared : : <include>/usr/include/dbpp-postgresql <library>dbppcore ; -lib z : : <link>shared ; lib md : : <link>shared ; project webstat : requirements diff --git a/src/Jamfile.jam b/src/Jamfile.jam index eca0c24..debe757 100644 --- a/src/Jamfile.jam +++ b/src/Jamfile.jam @@ -3,7 +3,6 @@ lib webstat : [ glob *.cpp : *_main.cpp ] : <library>..//adhocutil <library>..//dbppcore <library>../thirdparty//scn - <library>..//z <library>..//md <library>..//curl : : diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5ae2487..5454087 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -8,6 +8,7 @@ #include <modifycommand.h> #include <ranges> #include <scn/scan.h> +#include <selectcommand.h> #include <syslog.h> #include <utility> #include <zlib.h> @@ -18,7 +19,7 @@ namespace DB { // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { - bindParamI(idx, std::get<0>(entity)); + bindParamI(idx, std::get<1>(entity)); } } @@ -50,19 +51,11 @@ namespace WebStat { return hash; } - Crc32Value - crc32(const std::string_view value) - { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes - return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()), - static_cast<uInt>(value.length()))); - } - template<EntityType Type> struct ToEntity { Entity operator()(const std::string_view value) const { - return {crc32(value), Type, value}; + return {makeHash(value), std::nullopt, Type, value}; } template<typename T> @@ -76,7 +69,7 @@ namespace WebStat { }; auto - crc32ScanValues(const Ingestor::ScanValues & values) + hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity, ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity, @@ -91,6 +84,19 @@ namespace WebStat { }(std::make_index_sequence<VALUE_COUNT>()); } + template<std::integral T = EntityId, typename... Binds> + T + insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) + { + auto ins = dbconn->select(sql, opts); + bindMany(ins, 0, std::forward<Binds>(binds)...); + if (ins->fetch()) { + T out; + (*ins)[0] >> out; + return out; + } + throw DB::NoRowsAffected {}; + } } Ingestor * Ingestor::currentIngestor = nullptr; @@ -106,14 +112,10 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} + hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, + host.release, host.version, host.machine, host.domainname)}, + curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { - auto dbconn = dbpool->get(); - auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); - bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine, - host.domainname); - ins->execute(); - assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); @@ -236,7 +238,7 @@ namespace WebStat { } } tryIngestQueuedLogLines(); - parkQueuedLogLines(); + std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } @@ -256,34 +258,55 @@ namespace WebStat { } } + template<typename... T> + std::vector<Entity *> + Ingestor::entities(std::tuple<T...> & values) + { + std::vector<Entity *> entities; + visit( + [&entities]<typename V>(V & value) { + static_assert(!std::is_const_v<V>); + if constexpr (std::is_same_v<V, Entity>) { + entities.emplace_back(&value); + } + else if constexpr (std::is_same_v<V, std::optional<Entity>>) { + if (value) { + entities.emplace_back(&*value); + } + } + }, + values); + return entities; + } + void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { - auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value) - | std::views::transform([](auto && value) { - return *value; - }); + auto entityIds = std::views::transform([](auto && value) { + return std::make_pair(std::get<0>(*value), *std::get<1>(*value)); + }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; - const auto values = crc32ScanValues(result->values()); + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; - if (const auto newEnts = newEntities(values); newEnts.front()) { - existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds); - } + storeNewEntities(dbconn, valuesEntities); + existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; - const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); - storeEntities(dbconn, {uninsertableLine}); + auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line); + storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", - std::get<Crc32Value>(uninsertableLine), line.c_str()); + *std::get<1>(uninsertableLine), line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); @@ -293,10 +316,10 @@ namespace WebStat { } else { stats.linesParseFailed++; - const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); - log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), + auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); + storeNewEntity(dbconn, unparsableLine); + log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine), line.c_str()); - storeEntities(dbconn, {unparsableLine}); } } } @@ -426,34 +449,28 @@ namespace WebStat { return purgedTotal; } - template<typename... T> - Ingestor::NewEntities - Ingestor::newEntities(const std::tuple<T...> & values) const + void + Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { - Ingestor::NewEntities rtn; - auto next = rtn.begin(); - visit( - [this, &next]<typename X>(const X & entity) { - auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable { - if (!existingEntities.contains(std::get<0>(entityToAdd))) { - *next++ = entityToAdd; - } - return 0; - }; + for (const auto entity : entities) { + if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) { + std::get<1>(*entity) = existing->second; + } + } + } - if constexpr (std::is_same_v<X, Entity>) { - addNewIfReqd(entity); - } - else if constexpr (std::is_same_v<X, std::optional<Entity>>) { - entity.transform(addNewIfReqd); - } - }, - values); - return rtn; + void + Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const + { + for (const auto entity : entities) { + if (!std::get<1>(*entity)) { + storeNewEntity(dbconn, *entity); + } + } } - Ingestor::NewEntityIds - Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const + void + Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9> ENTITY_TYPE_VALUES {{ @@ -468,28 +485,21 @@ namespace WebStat { {"content_type", nullptr}, }}; - auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - NewEntityIds ids; - std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), - [this, &insert](auto && entity) { - const auto & [entityId, type, value] = *entity; - const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; - bindMany(insert, 0, entityId, typeName, value); - const auto insertedEntities = insert->execute(); - if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { - std::invoke(onInsert, this, *entity); - } - stats.entitiesInserted += insertedEntities; - return std::get<0>(*entity); - }); - return ids; + auto & [entityHash, entityId, type, value] = entity; + assert(!entityId); + const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; + entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName); + if (onInsert && std::this_thread::get_id() == mainThread) { + std::invoke(onInsert, this, entity); + } + stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { - const auto & [entityId, type, value] = entity; - auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str()); + const auto & [entityHash, entityId, type, value] = entity; + auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index b2e0fed..195f325 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -8,7 +8,7 @@ #include <connection_fwd.h> #include <cstdio> #include <expected> -#include <flat_set> +#include <flat_map> #include <future> #include <scn/scan.h> #include <span> @@ -79,7 +79,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::flat_set<Crc32Value> existingEntities; + std::flat_map<EntityHash, EntityId> existingEntities; LineBatch queuedLines; bool terminated = false; @@ -100,11 +100,10 @@ namespace WebStat { Job purgeOldLogs; private: - static constexpr size_t MAX_NEW_ENTITIES = 6; - using NewEntityIds = std::array<std::optional<Crc32Value>, MAX_NEW_ENTITIES>; - NewEntityIds storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const; - using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>; - template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const; + template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); + void fillKnownEntities(std::span<Entity *>) const; + void storeNewEntities(DB::Connection *, std::span<Entity *>) const; + void storeNewEntity(DB::Connection *, Entity &) const; void onNewUserAgent(const Entity &) const; void handleCurlOperations(); void logStats() const; @@ -120,7 +119,7 @@ namespace WebStat { [[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0; using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>; - uint32_t hostnameId; + EntityId hostnameId; CurlMultiPtr curl; mutable CurlOperations curlOperations; std::thread::id mainThread; diff --git a/src/logTypes.hpp b/src/logTypes.hpp index 6556d5c..fb66867 100644 --- a/src/logTypes.hpp +++ b/src/logTypes.hpp @@ -35,9 +35,9 @@ namespace WebStat { ContentType, }; - using Crc32Value = uint32_t; + using EntityId = int32_t; using EntityHash = std::array<uint8_t, MD5_DIGEST_LENGTH>; - using Entity = std::tuple<Crc32Value, EntityType, std::string_view>; + using Entity = std::tuple<EntityHash, std::optional<EntityId>, EntityType, std::string_view>; } namespace scn { diff --git a/src/schema.sql b/src/schema.sql index 789789a..fd9eb4f 100644 --- a/src/schema.sql +++ b/src/schema.sql @@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM( ); CREATE TABLE entities( - id oid NOT NULL, + id int GENERATED ALWAYS AS IDENTITY, value text NOT NULL, type entity NOT NULL, detail jsonb, @@ -42,30 +42,69 @@ CREATE TABLE entities( CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value)); +CREATE OR REPLACE FUNCTION entity(newValue text, newType entity) + RETURNS int + AS $$ +DECLARE + now timestamp without time zone; + recid int; +BEGIN + IF newValue IS NULL THEN + RETURN NULL; + END IF; + INSERT INTO entities(value, type) + SELECT + newValue, + newType + WHERE + NOT EXISTS ( + SELECT + FROM + entities + WHERE + md5(value) = md5(newValue)) + ON CONFLICT + DO NOTHING + RETURNING + id INTO recid; + IF recid IS NULL THEN + SELECT + id INTO recid + FROM + entities + WHERE + md5(value) = md5(newValue); + END IF; + RETURN recid; +END; +$$ +LANGUAGE plpgSQL +RETURNS NULL ON NULL INPUT; + CREATE TABLE access_log( id bigint GENERATED ALWAYS AS IDENTITY, - hostname oid NOT NULL, - virtual_host oid NOT NULL, + hostname int NOT NULL, + virtual_host int NOT NULL, remoteip inet NOT NULL, request_time timestamp(6) NOT NULL, method http_verb NOT NULL, protocol protocol NOT NULL, - path oid NOT NULL, - query_string oid, + path int NOT NULL, + query_string int, status smallint NOT NULL, size int NOT NULL, duration interval second(6) NOT NULL, - referrer oid, - user_agent oid, - content_type oid, + referrer int, + user_agent int, + content_type int, CONSTRAINT pk_access_log PRIMARY KEY (id), - CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id), - CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id), - CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id), - CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id), - CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id), - CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id), - CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) + CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id) ON UPDATE CASCADE, + CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) ON UPDATE CASCADE ); CREATE OR REPLACE VIEW access_log_view AS diff --git a/src/sql.cpp b/src/sql.cpp index da95f18..801a905 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -1,5 +1,6 @@ #include "sql.hpp" #include <command.h> +#include <dbpp-postgresql/pq-command.h> namespace WebStat::SQL { // ccache doesn't play nicely with #embed @@ -22,7 +23,8 @@ namespace WebStat::SQL { #embed "sql/hostUpsert.sql" }; #define HASH_OPTS(VAR) \ - const DB::CommandOptionsPtr VAR##_OPTS = std::make_shared<DB::CommandOptions>(std::hash<std::string> {}(VAR)) + const DB::CommandOptionsPtr VAR##_OPTS \ + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false) HASH_OPTS(ACCESS_LOG_INSERT); HASH_OPTS(ACCESS_LOG_PURGE_OLD); HASH_OPTS(ENTITY_INSERT); diff --git a/src/sql/entityInsert.sql b/src/sql/entityInsert.sql index 8e25810..f518617 100644 --- a/src/sql/entityInsert.sql +++ b/src/sql/entityInsert.sql @@ -1,4 +1,2 @@ -INSERT INTO entities(id, type, value) - VALUES (?, ?, ?) -ON CONFLICT - DO NOTHING +SELECT + entity(?, ?) diff --git a/src/sql/hostUpsert.sql b/src/sql/hostUpsert.sql index 18e8df8..d5ee11d 100644 --- a/src/sql/hostUpsert.sql +++ b/src/sql/hostUpsert.sql @@ -1,7 +1,9 @@ -INSERT INTO entities(id, type, value, detail) - VALUES ($1, 'host', $2, jsonb_build_object('sysname', $3::text, 'release', $4::text, - 'version', $5::text, 'machine', $6::text, 'domainname', $7::text)) -ON CONFLICT ON CONSTRAINT pk_entities +INSERT INTO entities(type, value, detail) + VALUES ('host', $1, jsonb_build_object('sysname', $2::text, 'release', $3::text, + 'version', $4::text, 'machine', $5::text, 'domainname', $6::text)) +ON CONFLICT (md5(value)) DO UPDATE SET - detail = jsonb_build_object('sysname', $3::text, 'release', $4::text, 'version', - $5::text, 'machine', $6::text, 'domainname', $7::text) + detail = jsonb_build_object('sysname', $2::text, 'release', $3::text, 'version', + $4::text, 'machine', $5::text, 'domainname', $6::text) + RETURNING + id diff --git a/src/uaLookup.cpp b/src/uaLookup.cpp index 8cd1cb2..0fee3f0 100644 --- a/src/uaLookup.cpp +++ b/src/uaLookup.cpp @@ -5,7 +5,7 @@ #include <modifycommand.h> namespace WebStat { - UserAgentLookupOperation::UserAgentLookupOperation(Crc32Value userAgentEntityId) : entityId {userAgentEntityId} { } + UserAgentLookupOperation::UserAgentLookupOperation(EntityId userAgentEntityId) : entityId {userAgentEntityId} { } void UserAgentLookupOperation::whenComplete(DB::Connection * dbconn) const @@ -16,7 +16,7 @@ namespace WebStat { } std::unique_ptr<CurlOperation> - curlGetUserAgentDetail(Crc32Value entityId, const std::string_view uas, const char * baseUrl) + curlGetUserAgentDetail(EntityId entityId, const std::string_view uas, const char * baseUrl) { auto request = std::make_unique<UserAgentLookupOperation>(entityId); diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp index 9714253..4de883f 100644 --- a/src/uaLookup.hpp +++ b/src/uaLookup.hpp @@ -9,13 +9,13 @@ namespace WebStat { class UserAgentLookupOperation : public CurlOperation { public: - UserAgentLookupOperation(Crc32Value entityId); + UserAgentLookupOperation(EntityId entityId); void whenComplete(DB::Connection *) const override; - Crc32Value entityId; + EntityId entityId; }; std::unique_ptr<CurlOperation> curlGetUserAgentDetail( - Crc32Value entityId, std::string_view uas, const char * baseUrl); + EntityId entityId, std::string_view uas, const char * baseUrl); } diff --git a/src/util.hpp b/src/util.hpp index 28bcebd..f7254e8 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -16,7 +16,7 @@ namespace WebStat { template<typename... T> auto - visit(auto && visitor, const std::tuple<T...> & values) + visit(auto && visitor, std::tuple<T...> & values) { std::apply( [&](auto &&... value) { diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index d399288..88e80e8 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -406,10 +406,10 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines()); auto dbconn = dbpool->get(); auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'"); - constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{ - {1664299262, "does not parse"}, + constexpr std::array<std::tuple<EntityId, std::string_view>, 1> EXPECTED {{ + {18, "does not parse"}, }}; - auto rows = select->as<Crc32Value, std::string_view>(); + auto rows = select->as<EntityId, std::string_view>(); BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); |
