summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-04-18 01:00:42 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-04-18 01:00:42 +0100
commit7b148411b127ebf6fdefdb1b0decd2886cdfc17b (patch)
tree56385615fe86dbb5ab7ce4a90d696d97ddf21d7e
parent1e551e618a63c869fde6a4b327566b38696a5f45 (diff)
parentfa6074eaf52be4254c17b74f20193aa96c940df8 (diff)
downloadwebstat-main.tar.bz2
webstat-main.tar.xz
webstat-main.zip
Merge remote-tracking branch 'origin/no-crc32'HEADwebstat-0.4main
-rw-r--r--Jamroot.jam2
-rw-r--r--src/Jamfile.jam2
-rw-r--r--src/ingestor.cpp204
-rw-r--r--src/ingestor.hpp18
-rw-r--r--src/logTypes.hpp12
-rw-r--r--src/schema.sql71
-rw-r--r--src/sql.cpp4
-rw-r--r--src/sql/entityInsert.sql6
-rw-r--r--src/sql/hostUpsert.sql14
-rw-r--r--src/uaLookup.cpp4
-rw-r--r--src/uaLookup.hpp6
-rw-r--r--src/util.hpp2
-rw-r--r--test/test-ingest.cpp40
13 files changed, 241 insertions, 144 deletions
diff --git a/Jamroot.jam b/Jamroot.jam
index 2f38c94..37f97a6 100644
--- a/Jamroot.jam
+++ b/Jamroot.jam
@@ -12,7 +12,7 @@ lib boost_po : : <link>shared <name>boost_program_options ;
lib adhocutil : : <link>shared : : <include>/usr/include/adhocutil <library>glibmm ;
lib dbppcore : : <link>shared : : <include>/usr/include/dbpp <library>adhocutil ;
lib dbpp-postgresql : : <link>shared : : <include>/usr/include/dbpp-postgresql <library>dbppcore ;
-lib z : : <link>shared ;
+lib md : : <link>shared ;
project webstat : requirements
<cxxstd>26
diff --git a/src/Jamfile.jam b/src/Jamfile.jam
index 9459dd8..debe757 100644
--- a/src/Jamfile.jam
+++ b/src/Jamfile.jam
@@ -3,7 +3,7 @@ lib webstat : [ glob *.cpp : *_main.cpp ] :
<library>..//adhocutil
<library>..//dbppcore
<library>../thirdparty//scn
- <library>..//z
+ <library>..//md
<library>..//curl
: :
<include>.
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index c5cb8d8..33af8cf 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -8,6 +8,7 @@
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
+#include <selectcommand.h>
#include <syslog.h>
#include <utility>
#include <zlib.h>
@@ -18,25 +19,48 @@ namespace DB {
// NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name)
DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity)
{
- bindParamI(idx, std::get<0>(entity));
+ bindParamI(idx, entity.id);
}
}
namespace WebStat {
namespace {
- Crc32Value
- crc32(const std::string_view value)
+ using ByteArrayView = std::span<const uint8_t>;
+
+ auto
+ bytesToHexRange(const ByteArrayView bytes)
+ {
+ constexpr auto HEXN = 16ZU;
+ return bytes | std::views::transform([](auto byte) {
+ return std::array {byte / HEXN, byte % HEXN};
+ }) | std::views::join
+ | std::views::transform([](auto nibble) {
+ return "0123456789abcdef"[nibble];
+ });
+ }
+
+ EntityHash
+ makeHash(const std::string_view value)
{
- // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes
- return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()),
- static_cast<uInt>(value.length())));
+ MD5_CTX ctx {};
+ MD5Init(&ctx);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes
+ MD5Update(&ctx, reinterpret_cast<const uint8_t *>(value.data()), value.length());
+ EntityHash hash {};
+ MD5Final(hash.data(), &ctx);
+ return hash;
}
template<EntityType Type> struct ToEntity {
Entity
operator()(const std::string_view value) const
{
- return {crc32(value), Type, value};
+ return {
+ .hash = makeHash(value),
+ .id = std::nullopt,
+ .type = Type,
+ .value = value,
+ };
}
template<typename T>
@@ -50,7 +74,7 @@ namespace WebStat {
};
auto
- crc32ScanValues(const Ingestor::ScanValues & values)
+ hashScanValues(const Ingestor::ScanValues & values)
{
static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity,
ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity,
@@ -65,6 +89,19 @@ namespace WebStat {
}(std::make_index_sequence<VALUE_COUNT>());
}
+ template<std::integral T = EntityId, typename... Binds>
+ T
+ insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds)
+ {
+ auto ins = dbconn->select(sql, opts);
+ bindMany(ins, 0, std::forward<Binds>(binds)...);
+ if (ins->fetch()) {
+ T out;
+ (*ins)[0] >> out;
+ return out;
+ }
+ throw DB::NoRowsAffected {};
+ }
}
Ingestor * Ingestor::currentIngestor = nullptr;
@@ -80,14 +117,10 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
+ hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
+ host.release, host.version, host.machine, host.domainname)},
+ curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
{
- auto dbconn = dbpool->get();
- auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
- bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine,
- host.domainname);
- ins->execute();
-
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
@@ -210,7 +243,7 @@ namespace WebStat {
}
}
tryIngestQueuedLogLines();
- parkQueuedLogLines();
+ std::ignore = parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
@@ -230,34 +263,58 @@ namespace WebStat {
}
}
+ template<typename... T>
+ std::vector<Entity *>
+ Ingestor::entities(std::tuple<T...> & values)
+ {
+ std::vector<Entity *> entities;
+ visit(
+ [&entities]<typename V>(V & value) {
+ static_assert(!std::is_const_v<V>);
+ if constexpr (std::is_same_v<V, Entity>) {
+ entities.emplace_back(&value);
+ }
+ else if constexpr (std::is_same_v<V, std::optional<Entity>>) {
+ if (value) {
+ entities.emplace_back(&*value);
+ }
+ }
+ },
+ values);
+ return entities;
+ }
+
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- });
+ auto entityIds = std::views::transform([](auto && value) {
+ return std::make_pair(value->hash, *value->id);
+ });
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
stats.linesParsed++;
- const auto values = crc32ScanValues(result->values());
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
try {
- DB::TransactionScope dbtx {*dbconn};
- if (const auto newEnts = newEntities(values); newEnts.front()) {
- existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
+ std::optional<DB::TransactionScope> lineTx;
+ if (!std::ranges::all_of(valuesEntities, &std::optional<EntityId>::has_value, &Entity::id)) {
+ lineTx.emplace(*dbconn);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities.insert_range(valuesEntities | entityIds);
}
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
try {
DB::TransactionScope dbtx {*dbconn};
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- storeEntities(dbconn, {uninsertableLine});
+ auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeNewEntity(dbconn, uninsertableLine);
log(LOG_NOTICE,
"Failed to store parsed line and/or associated entties, but did store raw line, %u:%s",
- std::get<Crc32Value>(uninsertableLine), line.c_str());
+ *uninsertableLine.id, line.c_str());
}
catch (const std::exception & excp) {
log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what());
@@ -267,22 +324,21 @@ namespace WebStat {
}
else {
stats.linesParseFailed++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
- line.c_str());
- storeEntities(dbconn, {unparsableLine});
+ auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeNewEntity(dbconn, unparsableLine);
+ log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str());
}
}
}
- void
+ std::expected<std::filesystem::path, int>
Ingestor::parkQueuedLogLines()
{
if (queuedLines.empty()) {
- return;
+ return std::unexpected(0);
}
- const std::filesystem::path path {
- settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))};
+ const std::filesystem::path path {settings.fallbackDir
+ / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))};
if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
fprintf(parked.get(), "%zu\n", queuedLines.size());
for (const auto & line : queuedLines) {
@@ -290,16 +346,19 @@ namespace WebStat {
}
if (fflush(parked.get()) == 0) {
queuedLines.clear();
- auto finalPath = auto {path}.replace_extension(".log");
+ auto finalPath = std::filesystem::path {path}.replace_extension(".log");
+ parked.reset();
if (rename(path.c_str(), finalPath.c_str()) == 0) {
- return;
+ return finalPath;
}
}
}
+ const int err = errno;
log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size());
for (const auto & line : queuedLines) {
log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data());
}
+ return std::unexpected(err);
}
void
@@ -334,7 +393,7 @@ namespace WebStat {
unsigned int count = 0;
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
- if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) {
+ if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) {
jobIngestParkedLines(pathIter->path());
count += 1;
}
@@ -397,34 +456,29 @@ namespace WebStat {
return purgedTotal;
}
- template<typename... T>
- Ingestor::NewEntities
- Ingestor::newEntities(const std::tuple<T...> & values) const
+ void
+ Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
- Ingestor::NewEntities rtn;
- auto next = rtn.begin();
- visit(
- [this, &next]<typename X>(const X & entity) {
- auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable {
- if (!existingEntities.contains(std::get<0>(entityToAdd))) {
- *next++ = entityToAdd;
- }
- return 0;
- };
+ for (const auto entity : entities) {
+ if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) {
+ entity->id = existing->second;
+ }
+ }
+ }
- if constexpr (std::is_same_v<X, Entity>) {
- addNewIfReqd(entity);
- }
- else if constexpr (std::is_same_v<X, std::optional<Entity>>) {
- entity.transform(addNewIfReqd);
- }
- },
- values);
- return rtn;
+ void
+ Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const
+ {
+ for (const auto entity : entities) {
+ if (!entity->id) {
+ storeNewEntity(dbconn, *entity);
+ assert(entity->id);
+ }
+ }
}
- Ingestor::NewEntityIds
- Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const
+ void
+ Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const
{
static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9>
ENTITY_TYPE_VALUES {{
@@ -439,28 +493,20 @@ namespace WebStat {
{"content_type", nullptr},
}};
- auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS);
- NewEntityIds ids;
- std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(),
- [this, &insert](auto && entity) {
- const auto & [entityId, type, value] = *entity;
- const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
- bindMany(insert, 0, entityId, typeName, value);
- const auto insertedEntities = insert->execute();
- if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
- std::invoke(onInsert, this, *entity);
- }
- stats.entitiesInserted += insertedEntities;
- return std::get<0>(*entity);
- });
- return ids;
+ assert(!entity.id);
+ const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)];
+ entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName);
+ if (onInsert && std::this_thread::get_id() == mainThread) {
+ std::invoke(onInsert, this, entity);
+ }
+ stats.entitiesInserted += 1;
}
void
Ingestor::onNewUserAgent(const Entity & entity) const
{
- const auto & [entityId, type, value] = entity;
- auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str());
+ const auto & [entityHash, entityId, type, value] = entity;
+ auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str());
auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp));
curl_multi_add_handle(curl.get(), added.first->first);
}
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index fcddc92..195f325 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -7,7 +7,8 @@
#include <connectionPool.h>
#include <connection_fwd.h>
#include <cstdio>
-#include <flat_set>
+#include <expected>
+#include <flat_map>
#include <future>
#include <scn/scan.h>
#include <span>
@@ -54,7 +55,7 @@ namespace WebStat {
void ingestLog(std::FILE *);
void tryIngestQueuedLogLines();
void ingestLogLines(DB::Connection *, const LineBatch & lines);
- void parkQueuedLogLines();
+ std::expected<std::filesystem::path, int> parkQueuedLogLines();
void runJobsAsNeeded();
unsigned int jobIngestParkedLines();
@@ -78,7 +79,7 @@ namespace WebStat {
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
- std::flat_set<Crc32Value> existingEntities;
+ std::flat_map<EntityHash, EntityId> existingEntities;
LineBatch queuedLines;
bool terminated = false;
@@ -99,11 +100,10 @@ namespace WebStat {
Job purgeOldLogs;
private:
- static constexpr size_t MAX_NEW_ENTITIES = 6;
- using NewEntityIds = std::array<std::optional<Crc32Value>, MAX_NEW_ENTITIES>;
- NewEntityIds storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const;
- using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>;
- template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const;
+ template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
+ void fillKnownEntities(std::span<Entity *>) const;
+ void storeNewEntities(DB::Connection *, std::span<Entity *>) const;
+ void storeNewEntity(DB::Connection *, Entity &) const;
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
void logStats() const;
@@ -119,7 +119,7 @@ namespace WebStat {
[[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0;
using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>;
- uint32_t hostnameId;
+ EntityId hostnameId;
CurlMultiPtr curl;
mutable CurlOperations curlOperations;
std::thread::id mainThread;
diff --git a/src/logTypes.hpp b/src/logTypes.hpp
index 71393b2..7f0473e 100644
--- a/src/logTypes.hpp
+++ b/src/logTypes.hpp
@@ -1,5 +1,6 @@
#pragma once
+#include <md5.h>
#include <optional>
#include <scn/scan.h>
#include <string>
@@ -34,8 +35,15 @@ namespace WebStat {
ContentType,
};
- using Crc32Value = uint32_t;
- using Entity = std::tuple<Crc32Value, EntityType, std::string_view>;
+ using EntityId = int32_t;
+ using EntityHash = std::array<uint8_t, MD5_DIGEST_LENGTH>;
+
+ struct Entity {
+ EntityHash hash;
+ std::optional<EntityId> id;
+ EntityType type;
+ std::string_view value;
+ };
}
namespace scn {
diff --git a/src/schema.sql b/src/schema.sql
index 789789a..b211505 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM(
);
CREATE TABLE entities(
- id oid NOT NULL,
+ id integer GENERATED ALWAYS AS IDENTITY,
value text NOT NULL,
type entity NOT NULL,
detail jsonb,
@@ -42,30 +42,69 @@ CREATE TABLE entities(
CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value));
+CREATE OR REPLACE FUNCTION entity(newValue text, newType entity)
+ RETURNS integer
+ AS $$
+DECLARE
+ now timestamp without time zone;
+ recid integer;
+BEGIN
+ IF newValue IS NULL THEN
+ RETURN NULL;
+ END IF;
+ INSERT INTO entities(value, type)
+ SELECT
+ newValue,
+ newType
+ WHERE
+ NOT EXISTS (
+ SELECT
+ FROM
+ entities
+ WHERE
+ md5(value) = md5(newValue))
+ ON CONFLICT
+ DO NOTHING
+ RETURNING
+ id INTO recid;
+ IF recid IS NULL THEN
+ SELECT
+ id INTO recid
+ FROM
+ entities
+ WHERE
+ md5(value) = md5(newValue);
+ END IF;
+ RETURN recid;
+END;
+$$
+LANGUAGE plpgSQL
+RETURNS NULL ON NULL INPUT;
+
CREATE TABLE access_log(
id bigint GENERATED ALWAYS AS IDENTITY,
- hostname oid NOT NULL,
- virtual_host oid NOT NULL,
+ hostname integer NOT NULL,
+ virtual_host integer NOT NULL,
remoteip inet NOT NULL,
request_time timestamp(6) NOT NULL,
method http_verb NOT NULL,
protocol protocol NOT NULL,
- path oid NOT NULL,
- query_string oid,
+ path integer NOT NULL,
+ query_string integer,
status smallint NOT NULL,
- size int NOT NULL,
+ size integer NOT NULL,
duration interval second(6) NOT NULL,
- referrer oid,
- user_agent oid,
- content_type oid,
+ referrer integer,
+ user_agent integer,
+ content_type integer,
CONSTRAINT pk_access_log PRIMARY KEY (id),
- CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id),
- CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id),
- CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id),
- CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id),
- CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id),
- CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id),
- CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id)
+ CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) ON UPDATE CASCADE
);
CREATE OR REPLACE VIEW access_log_view AS
diff --git a/src/sql.cpp b/src/sql.cpp
index da95f18..801a905 100644
--- a/src/sql.cpp
+++ b/src/sql.cpp
@@ -1,5 +1,6 @@
#include "sql.hpp"
#include <command.h>
+#include <dbpp-postgresql/pq-command.h>
namespace WebStat::SQL {
// ccache doesn't play nicely with #embed
@@ -22,7 +23,8 @@ namespace WebStat::SQL {
#embed "sql/hostUpsert.sql"
};
#define HASH_OPTS(VAR) \
- const DB::CommandOptionsPtr VAR##_OPTS = std::make_shared<DB::CommandOptions>(std::hash<std::string> {}(VAR))
+ const DB::CommandOptionsPtr VAR##_OPTS \
+ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false)
HASH_OPTS(ACCESS_LOG_INSERT);
HASH_OPTS(ACCESS_LOG_PURGE_OLD);
HASH_OPTS(ENTITY_INSERT);
diff --git a/src/sql/entityInsert.sql b/src/sql/entityInsert.sql
index 8e25810..f518617 100644
--- a/src/sql/entityInsert.sql
+++ b/src/sql/entityInsert.sql
@@ -1,4 +1,2 @@
-INSERT INTO entities(id, type, value)
- VALUES (?, ?, ?)
-ON CONFLICT
- DO NOTHING
+SELECT
+ entity(?, ?)
diff --git a/src/sql/hostUpsert.sql b/src/sql/hostUpsert.sql
index 18e8df8..d5ee11d 100644
--- a/src/sql/hostUpsert.sql
+++ b/src/sql/hostUpsert.sql
@@ -1,7 +1,9 @@
-INSERT INTO entities(id, type, value, detail)
- VALUES ($1, 'host', $2, jsonb_build_object('sysname', $3::text, 'release', $4::text,
- 'version', $5::text, 'machine', $6::text, 'domainname', $7::text))
-ON CONFLICT ON CONSTRAINT pk_entities
+INSERT INTO entities(type, value, detail)
+ VALUES ('host', $1, jsonb_build_object('sysname', $2::text, 'release', $3::text,
+ 'version', $4::text, 'machine', $5::text, 'domainname', $6::text))
+ON CONFLICT (md5(value))
DO UPDATE SET
- detail = jsonb_build_object('sysname', $3::text, 'release', $4::text, 'version',
- $5::text, 'machine', $6::text, 'domainname', $7::text)
+ detail = jsonb_build_object('sysname', $2::text, 'release', $3::text, 'version',
+ $4::text, 'machine', $5::text, 'domainname', $6::text)
+ RETURNING
+ id
diff --git a/src/uaLookup.cpp b/src/uaLookup.cpp
index 8cd1cb2..0fee3f0 100644
--- a/src/uaLookup.cpp
+++ b/src/uaLookup.cpp
@@ -5,7 +5,7 @@
#include <modifycommand.h>
namespace WebStat {
- UserAgentLookupOperation::UserAgentLookupOperation(Crc32Value userAgentEntityId) : entityId {userAgentEntityId} { }
+ UserAgentLookupOperation::UserAgentLookupOperation(EntityId userAgentEntityId) : entityId {userAgentEntityId} { }
void
UserAgentLookupOperation::whenComplete(DB::Connection * dbconn) const
@@ -16,7 +16,7 @@ namespace WebStat {
}
std::unique_ptr<CurlOperation>
- curlGetUserAgentDetail(Crc32Value entityId, const std::string_view uas, const char * baseUrl)
+ curlGetUserAgentDetail(EntityId entityId, const std::string_view uas, const char * baseUrl)
{
auto request = std::make_unique<UserAgentLookupOperation>(entityId);
diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp
index 9714253..4de883f 100644
--- a/src/uaLookup.hpp
+++ b/src/uaLookup.hpp
@@ -9,13 +9,13 @@
namespace WebStat {
class UserAgentLookupOperation : public CurlOperation {
public:
- UserAgentLookupOperation(Crc32Value entityId);
+ UserAgentLookupOperation(EntityId entityId);
void whenComplete(DB::Connection *) const override;
- Crc32Value entityId;
+ EntityId entityId;
};
std::unique_ptr<CurlOperation> curlGetUserAgentDetail(
- Crc32Value entityId, std::string_view uas, const char * baseUrl);
+ EntityId entityId, std::string_view uas, const char * baseUrl);
}
diff --git a/src/util.hpp b/src/util.hpp
index 28bcebd..f7254e8 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -16,7 +16,7 @@ namespace WebStat {
template<typename... T>
auto
- visit(auto && visitor, const std::tuple<T...> & values)
+ visit(auto && visitor, std::tuple<T...> & values)
{
std::apply(
[&](auto &&... value) {
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index 5fc8195..88e80e8 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -155,7 +155,6 @@ BOOST_DATA_TEST_CASE(CLFStringsBad,
constexpr std::string_view LOGLINE1
= R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 GET "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "test/plain")LOG";
-constexpr std::string_view LOGLINE1_PARKED = "parked-237093379.log";
constexpr std::string_view LOGLINE2
= R"LOG(www.randomdan.homeip.net 43.128.84.166 1755561575973204 GET "/app-dicts/myspell-et/Manifest" "" HTTP/1.1 200 312 10369 "https://google.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36" "image/png")LOG";
@@ -298,11 +297,11 @@ BOOST_AUTO_TEST_CASE(ParkLogLine)
{
queuedLines.emplace_back(LOGLINE1);
queuedLines.emplace_back(LOGLINE2);
- parkQueuedLogLines();
- const auto path = settings.fallbackDir / LOGLINE1_PARKED;
- BOOST_TEST_INFO(path);
- BOOST_REQUIRE(std::filesystem::exists(path));
- BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4);
+ const auto path = parkQueuedLogLines();
+ BOOST_REQUIRE(path);
+ BOOST_TEST_INFO(*path);
+ BOOST_REQUIRE(std::filesystem::exists(*path));
+ BOOST_CHECK_EQUAL(std::filesystem::file_size(*path), LOGLINE1.length() + LOGLINE2.length() + 4);
}
BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine"))
@@ -318,11 +317,12 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine"
{
queuedLines.emplace_back(LOGLINE1);
queuedLines.emplace_back(LOGLINE2);
- parkQueuedLogLines();
+ BOOST_REQUIRE(parkQueuedLogLines());
+ BOOST_CHECK(!std::filesystem::is_empty(settings.fallbackDir));
BOOST_REQUIRE(queuedLines.empty());
jobIngestParkedLines();
BOOST_CHECK_EQUAL(queuedLines.size(), 2);
- BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_CHECK(std::filesystem::is_empty(settings.fallbackDir));
}
BOOST_AUTO_TEST_CASE(DefaultLaunchNoJobs)
@@ -338,21 +338,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
const auto now = Job::LastRunTime::clock::now();
ingestParkedLines.lastRun = now - 1s;
queuedLines.emplace_back(LOGLINE1);
- parkQueuedLogLines();
+ const auto path = parkQueuedLogLines();
+ BOOST_REQUIRE(path);
BOOST_REQUIRE(queuedLines.empty());
- BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_REQUIRE(std::filesystem::exists(*path));
runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_CHECK(queuedLines.empty());
- BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_CHECK(std::filesystem::exists(*path));
BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s);
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s;
runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_CHECK(queuedLines.empty());
- BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_CHECK(std::filesystem::exists(*path));
BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s);
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
@@ -363,7 +364,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
BOOST_REQUIRE(!ingestParkedLines.currentRun);
BOOST_CHECK_EQUAL(queuedLines.size(), 1);
BOOST_CHECK_GE(ingestParkedLines.lastRun, now);
- BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_CHECK(!std::filesystem::exists(*path));
}
BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/IngestParkedJob"))
@@ -371,14 +372,15 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges
const auto now = Job::LastRunTime::clock::now();
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
queuedLines.emplace_back(LOGLINE1);
- parkQueuedLogLines();
- std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write);
+ const auto path = parkQueuedLogLines();
+ BOOST_REQUIRE(path);
+ std::filesystem::permissions(*path, std::filesystem::perms::owner_write);
runJobsAsNeeded();
BOOST_REQUIRE(ingestParkedLines.currentRun);
ingestParkedLines.currentRun->wait();
runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
- BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
+ BOOST_CHECK(std::filesystem::exists(*path));
BOOST_CHECK_GE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) - 1s);
BOOST_CHECK_LE(ingestParkedLines.lastRun, now - (settings.freqIngestParkedLines / 2) + 1s);
}
@@ -404,10 +406,10 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable)
BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines());
auto dbconn = dbpool->get();
auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'");
- constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{
- {1664299262, "does not parse"},
+ constexpr std::array<std::tuple<EntityId, std::string_view>, 1> EXPECTED {{
+ {18, "does not parse"},
}};
- auto rows = select->as<Crc32Value, std::string_view>();
+ auto rows = select->as<EntityId, std::string_view>();
BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end());
BOOST_CHECK_EQUAL(stats.linesParseFailed, 1);
BOOST_CHECK_EQUAL(stats.entitiesInserted, 1);