summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Jamroot.jam1
-rw-r--r--src/Jamfile.jam1
-rw-r--r--src/ingestor.cpp158
-rw-r--r--src/ingestor.hpp15
-rw-r--r--src/logTypes.hpp4
-rw-r--r--src/schema.sql69
-rw-r--r--src/sql.cpp4
-rw-r--r--src/sql/entityInsert.sql6
-rw-r--r--src/sql/hostUpsert.sql14
-rw-r--r--src/uaLookup.cpp4
-rw-r--r--src/uaLookup.hpp6
-rw-r--r--src/util.hpp2
-rw-r--r--test/test-ingest.cpp6
13 files changed, 169 insertions, 121 deletions
diff --git a/Jamroot.jam b/Jamroot.jam
index 37e4a0e..37f97a6 100644
--- a/Jamroot.jam
+++ b/Jamroot.jam
@@ -12,7 +12,6 @@ lib boost_po : : <link>shared <name>boost_program_options ;
lib adhocutil : : <link>shared : : <include>/usr/include/adhocutil <library>glibmm ;
lib dbppcore : : <link>shared : : <include>/usr/include/dbpp <library>adhocutil ;
lib dbpp-postgresql : : <link>shared : : <include>/usr/include/dbpp-postgresql <library>dbppcore ;
-lib z : : <link>shared ;
lib md : : <link>shared ;
project webstat : requirements
diff --git a/src/Jamfile.jam b/src/Jamfile.jam
index eca0c24..debe757 100644
--- a/src/Jamfile.jam
+++ b/src/Jamfile.jam
@@ -3,7 +3,6 @@ lib webstat : [ glob *.cpp : *_main.cpp ] :
<library>..//adhocutil
<library>..//dbppcore
<library>../thirdparty//scn
- <library>..//z
<library>..//md
<library>..//curl
: :
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 5ae2487..5454087 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -8,6 +8,7 @@
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
+#include <selectcommand.h>
#include <syslog.h>
#include <utility>
#include <zlib.h>
@@ -18,7 +19,7 @@ namespace DB {
// NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name)
DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity)
{
- bindParamI(idx, std::get<0>(entity));
+ bindParamI(idx, std::get<1>(entity));
}
}
@@ -50,19 +51,11 @@ namespace WebStat {
return hash;
}
- Crc32Value
- crc32(const std::string_view value)
- {
- // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes
- return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()),
- static_cast<uInt>(value.length())));
- }
-
template<EntityType Type> struct ToEntity {
Entity
operator()(const std::string_view value) const
{
- return {crc32(value), Type, value};
+ return {makeHash(value), std::nullopt, Type, value};
}
template<typename T>
@@ -76,7 +69,7 @@ namespace WebStat {
};
auto
- crc32ScanValues(const Ingestor::ScanValues & values)
+ hashScanValues(const Ingestor::ScanValues & values)
{
static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity,
ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity,
@@ -91,6 +84,19 @@ namespace WebStat {
}(std::make_index_sequence<VALUE_COUNT>());
}
+ template<std::integral T = EntityId, typename... Binds>
+ T
+ insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds)
+ {
+ auto ins = dbconn->select(sql, opts);
+ bindMany(ins, 0, std::forward<Binds>(binds)...);
+ if (ins->fetch()) {
+ T out;
+ (*ins)[0] >> out;
+ return out;
+ }
+ throw DB::NoRowsAffected {};
+ }
}
Ingestor * Ingestor::currentIngestor = nullptr;
@@ -106,14 +112,10 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
+ hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
+ host.release, host.version, host.machine, host.domainname)},
+ curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
{
- auto dbconn = dbpool->get();
- auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
- bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine,
- host.domainname);
- ins->execute();
-
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
@@ -236,7 +238,7 @@ namespace WebStat {
}
}
tryIngestQueuedLogLines();
- parkQueuedLogLines();
+ std::ignore = parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
@@ -256,34 +258,55 @@ namespace WebStat {
}
}
+ template<typename... T>
+ std::vector<Entity *>
+ Ingestor::entities(std::tuple<T...> & values)
+ {
+ std::vector<Entity *> entities;
+ visit(
+ [&entities]<typename V>(V & value) {
+ static_assert(!std::is_const_v<V>);
+ if constexpr (std::is_same_v<V, Entity>) {
+ entities.emplace_back(&value);
+ }
+ else if constexpr (std::is_same_v<V, std::optional<Entity>>) {
+ if (value) {
+ entities.emplace_back(&*value);
+ }
+ }
+ },
+ values);
+ return entities;
+ }
+
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- });
+ auto entityIds = std::views::transform([](auto && value) {
+ return std::make_pair(std::get<0>(*value), *std::get<1>(*value));
+ });
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
stats.linesParsed++;
- const auto values = crc32ScanValues(result->values());
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
try {
DB::TransactionScope dbtx {*dbconn};
- if (const auto newEnts = newEntities(values); newEnts.front()) {
- existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
- }
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities.insert_range(valuesEntities | entityIds);
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
try {
DB::TransactionScope dbtx {*dbconn};
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- storeEntities(dbconn, {uninsertableLine});
+ auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeNewEntity(dbconn, uninsertableLine);
log(LOG_NOTICE,
"Failed to store parsed line and/or associated entties, but did store raw line, %u:%s",
- std::get<Crc32Value>(uninsertableLine), line.c_str());
+ *std::get<1>(uninsertableLine), line.c_str());
}
catch (const std::exception & excp) {
log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what());
@@ -293,10 +316,10 @@ namespace WebStat {
}
else {
stats.linesParseFailed++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
+ auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeNewEntity(dbconn, unparsableLine);
+ log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine),
line.c_str());
- storeEntities(dbconn, {unparsableLine});
}
}
}
@@ -426,34 +449,28 @@ namespace WebStat {
return purgedTotal;
}
- template<typename... T>
- Ingestor::NewEntities
- Ingestor::newEntities(const std::tuple<T...> & values) const
+ void
+ Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
- Ingestor::NewEntities rtn;
- auto next = rtn.begin();
- visit(
- [this, &next]<typename X>(const X & entity) {
- auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable {
- if (!existingEntities.contains(std::get<0>(entityToAdd))) {
- *next++ = entityToAdd;
- }
- return 0;
- };
+ for (const auto entity : entities) {
+ if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) {
+ std::get<1>(*entity) = existing->second;
+ }
+ }
+ }
- if constexpr (std::is_same_v<X, Entity>) {
- addNewIfReqd(entity);
- }
- else if constexpr (std::is_same_v<X, std::optional<Entity>>) {
- entity.transform(addNewIfReqd);
- }
- },
- values);
- return rtn;
+ void
+ Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const
+ {
+ for (const auto entity : entities) {
+ if (!std::get<1>(*entity)) {
+ storeNewEntity(dbconn, *entity);
+ }
+ }
}
- Ingestor::NewEntityIds
- Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const
+ void
+ Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const
{
static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9>
ENTITY_TYPE_VALUES {{
@@ -468,28 +485,21 @@ namespace WebStat {
{"content_type", nullptr},
}};
- auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS);
- NewEntityIds ids;
- std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(),
- [this, &insert](auto && entity) {
- const auto & [entityId, type, value] = *entity;
- const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
- bindMany(insert, 0, entityId, typeName, value);
- const auto insertedEntities = insert->execute();
- if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
- std::invoke(onInsert, this, *entity);
- }
- stats.entitiesInserted += insertedEntities;
- return std::get<0>(*entity);
- });
- return ids;
+ auto & [entityHash, entityId, type, value] = entity;
+ assert(!entityId);
+ const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
+ entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName);
+ if (onInsert && std::this_thread::get_id() == mainThread) {
+ std::invoke(onInsert, this, entity);
+ }
+ stats.entitiesInserted += 1;
}
void
Ingestor::onNewUserAgent(const Entity & entity) const
{
- const auto & [entityId, type, value] = entity;
- auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str());
+ const auto & [entityHash, entityId, type, value] = entity;
+ auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str());
auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp));
curl_multi_add_handle(curl.get(), added.first->first);
}
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index b2e0fed..195f325 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -8,7 +8,7 @@
#include <connection_fwd.h>
#include <cstdio>
#include <expected>
-#include <flat_set>
+#include <flat_map>
#include <future>
#include <scn/scan.h>
#include <span>
@@ -79,7 +79,7 @@ namespace WebStat {
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
- std::flat_set<Crc32Value> existingEntities;
+ std::flat_map<EntityHash, EntityId> existingEntities;
LineBatch queuedLines;
bool terminated = false;
@@ -100,11 +100,10 @@ namespace WebStat {
Job purgeOldLogs;
private:
- static constexpr size_t MAX_NEW_ENTITIES = 6;
- using NewEntityIds = std::array<std::optional<Crc32Value>, MAX_NEW_ENTITIES>;
- NewEntityIds storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const;
- using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>;
- template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const;
+ template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
+ void fillKnownEntities(std::span<Entity *>) const;
+ void storeNewEntities(DB::Connection *, std::span<Entity *>) const;
+ void storeNewEntity(DB::Connection *, Entity &) const;
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
void logStats() const;
@@ -120,7 +119,7 @@ namespace WebStat {
[[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0;
using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>;
- uint32_t hostnameId;
+ EntityId hostnameId;
CurlMultiPtr curl;
mutable CurlOperations curlOperations;
std::thread::id mainThread;
diff --git a/src/logTypes.hpp b/src/logTypes.hpp
index 6556d5c..fb66867 100644
--- a/src/logTypes.hpp
+++ b/src/logTypes.hpp
@@ -35,9 +35,9 @@ namespace WebStat {
ContentType,
};
- using Crc32Value = uint32_t;
+ using EntityId = int32_t;
using EntityHash = std::array<uint8_t, MD5_DIGEST_LENGTH>;
- using Entity = std::tuple<Crc32Value, EntityType, std::string_view>;
+ using Entity = std::tuple<EntityHash, std::optional<EntityId>, EntityType, std::string_view>;
}
namespace scn {
diff --git a/src/schema.sql b/src/schema.sql
index 789789a..fd9eb4f 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -33,7 +33,7 @@ CREATE TYPE entity AS ENUM(
);
CREATE TABLE entities(
- id oid NOT NULL,
+ id int GENERATED ALWAYS AS IDENTITY,
value text NOT NULL,
type entity NOT NULL,
detail jsonb,
@@ -42,30 +42,69 @@ CREATE TABLE entities(
CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value));
+CREATE OR REPLACE FUNCTION entity(newValue text, newType entity)
+ RETURNS int
+ AS $$
+DECLARE
+ now timestamp without time zone;
+ recid int;
+BEGIN
+ IF newValue IS NULL THEN
+ RETURN NULL;
+ END IF;
+ INSERT INTO entities(value, type)
+ SELECT
+ newValue,
+ newType
+ WHERE
+ NOT EXISTS (
+ SELECT
+ FROM
+ entities
+ WHERE
+ md5(value) = md5(newValue))
+ ON CONFLICT
+ DO NOTHING
+ RETURNING
+ id INTO recid;
+ IF recid IS NULL THEN
+ SELECT
+ id INTO recid
+ FROM
+ entities
+ WHERE
+ md5(value) = md5(newValue);
+ END IF;
+ RETURN recid;
+END;
+$$
+LANGUAGE plpgSQL
+RETURNS NULL ON NULL INPUT;
+
CREATE TABLE access_log(
id bigint GENERATED ALWAYS AS IDENTITY,
- hostname oid NOT NULL,
- virtual_host oid NOT NULL,
+ hostname int NOT NULL,
+ virtual_host int NOT NULL,
remoteip inet NOT NULL,
request_time timestamp(6) NOT NULL,
method http_verb NOT NULL,
protocol protocol NOT NULL,
- path oid NOT NULL,
- query_string oid,
+ path int NOT NULL,
+ query_string int,
status smallint NOT NULL,
size int NOT NULL,
duration interval second(6) NOT NULL,
- referrer oid,
- user_agent oid,
- content_type oid,
+ referrer int,
+ user_agent int,
+ content_type int,
CONSTRAINT pk_access_log PRIMARY KEY (id),
- CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id),
- CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id),
- CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id),
- CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id),
- CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id),
- CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id),
- CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id)
+ CONSTRAINT fk_access_log_hostname FOREIGN KEY (hostname) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_virtualhost FOREIGN KEY (virtual_host) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_path FOREIGN KEY (path) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_query_string FOREIGN KEY (query_string) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_referrer FOREIGN KEY (referrer) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_user_agent FOREIGN KEY (user_agent) REFERENCES entities(id) ON UPDATE CASCADE,
+ CONSTRAINT fk_access_log_content_type FOREIGN KEY (content_type) REFERENCES entities(id) ON UPDATE CASCADE
);
CREATE OR REPLACE VIEW access_log_view AS
diff --git a/src/sql.cpp b/src/sql.cpp
index da95f18..801a905 100644
--- a/src/sql.cpp
+++ b/src/sql.cpp
@@ -1,5 +1,6 @@
#include "sql.hpp"
#include <command.h>
+#include <dbpp-postgresql/pq-command.h>
namespace WebStat::SQL {
// ccache doesn't play nicely with #embed
@@ -22,7 +23,8 @@ namespace WebStat::SQL {
#embed "sql/hostUpsert.sql"
};
#define HASH_OPTS(VAR) \
- const DB::CommandOptionsPtr VAR##_OPTS = std::make_shared<DB::CommandOptions>(std::hash<std::string> {}(VAR))
+ const DB::CommandOptionsPtr VAR##_OPTS \
+ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false)
HASH_OPTS(ACCESS_LOG_INSERT);
HASH_OPTS(ACCESS_LOG_PURGE_OLD);
HASH_OPTS(ENTITY_INSERT);
diff --git a/src/sql/entityInsert.sql b/src/sql/entityInsert.sql
index 8e25810..f518617 100644
--- a/src/sql/entityInsert.sql
+++ b/src/sql/entityInsert.sql
@@ -1,4 +1,2 @@
-INSERT INTO entities(id, type, value)
- VALUES (?, ?, ?)
-ON CONFLICT
- DO NOTHING
+SELECT
+ entity(?, ?)
diff --git a/src/sql/hostUpsert.sql b/src/sql/hostUpsert.sql
index 18e8df8..d5ee11d 100644
--- a/src/sql/hostUpsert.sql
+++ b/src/sql/hostUpsert.sql
@@ -1,7 +1,9 @@
-INSERT INTO entities(id, type, value, detail)
- VALUES ($1, 'host', $2, jsonb_build_object('sysname', $3::text, 'release', $4::text,
- 'version', $5::text, 'machine', $6::text, 'domainname', $7::text))
-ON CONFLICT ON CONSTRAINT pk_entities
+INSERT INTO entities(type, value, detail)
+ VALUES ('host', $1, jsonb_build_object('sysname', $2::text, 'release', $3::text,
+ 'version', $4::text, 'machine', $5::text, 'domainname', $6::text))
+ON CONFLICT (md5(value))
DO UPDATE SET
- detail = jsonb_build_object('sysname', $3::text, 'release', $4::text, 'version',
- $5::text, 'machine', $6::text, 'domainname', $7::text)
+ detail = jsonb_build_object('sysname', $2::text, 'release', $3::text, 'version',
+ $4::text, 'machine', $5::text, 'domainname', $6::text)
+ RETURNING
+ id
diff --git a/src/uaLookup.cpp b/src/uaLookup.cpp
index 8cd1cb2..0fee3f0 100644
--- a/src/uaLookup.cpp
+++ b/src/uaLookup.cpp
@@ -5,7 +5,7 @@
#include <modifycommand.h>
namespace WebStat {
- UserAgentLookupOperation::UserAgentLookupOperation(Crc32Value userAgentEntityId) : entityId {userAgentEntityId} { }
+ UserAgentLookupOperation::UserAgentLookupOperation(EntityId userAgentEntityId) : entityId {userAgentEntityId} { }
void
UserAgentLookupOperation::whenComplete(DB::Connection * dbconn) const
@@ -16,7 +16,7 @@ namespace WebStat {
}
std::unique_ptr<CurlOperation>
- curlGetUserAgentDetail(Crc32Value entityId, const std::string_view uas, const char * baseUrl)
+ curlGetUserAgentDetail(EntityId entityId, const std::string_view uas, const char * baseUrl)
{
auto request = std::make_unique<UserAgentLookupOperation>(entityId);
diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp
index 9714253..4de883f 100644
--- a/src/uaLookup.hpp
+++ b/src/uaLookup.hpp
@@ -9,13 +9,13 @@
namespace WebStat {
class UserAgentLookupOperation : public CurlOperation {
public:
- UserAgentLookupOperation(Crc32Value entityId);
+ UserAgentLookupOperation(EntityId entityId);
void whenComplete(DB::Connection *) const override;
- Crc32Value entityId;
+ EntityId entityId;
};
std::unique_ptr<CurlOperation> curlGetUserAgentDetail(
- Crc32Value entityId, std::string_view uas, const char * baseUrl);
+ EntityId entityId, std::string_view uas, const char * baseUrl);
}
diff --git a/src/util.hpp b/src/util.hpp
index 28bcebd..f7254e8 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -16,7 +16,7 @@ namespace WebStat {
template<typename... T>
auto
- visit(auto && visitor, const std::tuple<T...> & values)
+ visit(auto && visitor, std::tuple<T...> & values)
{
std::apply(
[&](auto &&... value) {
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index d399288..88e80e8 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -406,10 +406,10 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable)
BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines());
auto dbconn = dbpool->get();
auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'");
- constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{
- {1664299262, "does not parse"},
+ constexpr std::array<std::tuple<EntityId, std::string_view>, 1> EXPECTED {{
+ {18, "does not parse"},
}};
- auto rows = select->as<Crc32Value, std::string_view>();
+ auto rows = select->as<EntityId, std::string_view>();
BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end());
BOOST_CHECK_EQUAL(stats.linesParseFailed, 1);
BOOST_CHECK_EQUAL(stats.entitiesInserted, 1);