summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp158
1 files changed, 84 insertions, 74 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 5ae2487..5454087 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -8,6 +8,7 @@
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
+#include <selectcommand.h>
#include <syslog.h>
#include <utility>
#include <zlib.h>
@@ -18,7 +19,7 @@ namespace DB {
// NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name)
DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity)
{
- bindParamI(idx, std::get<0>(entity));
+ bindParamI(idx, std::get<1>(entity));
}
}
@@ -50,19 +51,11 @@ namespace WebStat {
return hash;
}
- Crc32Value
- crc32(const std::string_view value)
- {
- // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes
- return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()),
- static_cast<uInt>(value.length())));
- }
-
template<EntityType Type> struct ToEntity {
Entity
operator()(const std::string_view value) const
{
- return {crc32(value), Type, value};
+ return {makeHash(value), std::nullopt, Type, value};
}
template<typename T>
@@ -76,7 +69,7 @@ namespace WebStat {
};
auto
- crc32ScanValues(const Ingestor::ScanValues & values)
+ hashScanValues(const Ingestor::ScanValues & values)
{
static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity,
ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity,
@@ -91,6 +84,19 @@ namespace WebStat {
}(std::make_index_sequence<VALUE_COUNT>());
}
+ template<std::integral T = EntityId, typename... Binds>
+ T
+ insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds)
+ {
+ auto ins = dbconn->select(sql, opts);
+ bindMany(ins, 0, std::forward<Binds>(binds)...);
+ if (ins->fetch()) {
+ T out;
+ (*ins)[0] >> out;
+ return out;
+ }
+ throw DB::NoRowsAffected {};
+ }
}
Ingestor * Ingestor::currentIngestor = nullptr;
@@ -106,14 +112,10 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
+ hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
+ host.release, host.version, host.machine, host.domainname)},
+ curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
{
- auto dbconn = dbpool->get();
- auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
- bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine,
- host.domainname);
- ins->execute();
-
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
@@ -236,7 +238,7 @@ namespace WebStat {
}
}
tryIngestQueuedLogLines();
- parkQueuedLogLines();
+ std::ignore = parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
@@ -256,34 +258,55 @@ namespace WebStat {
}
}
+ template<typename... T>
+ std::vector<Entity *>
+ Ingestor::entities(std::tuple<T...> & values)
+ {
+ std::vector<Entity *> entities;
+ visit(
+ [&entities]<typename V>(V & value) {
+ static_assert(!std::is_const_v<V>);
+ if constexpr (std::is_same_v<V, Entity>) {
+ entities.emplace_back(&value);
+ }
+ else if constexpr (std::is_same_v<V, std::optional<Entity>>) {
+ if (value) {
+ entities.emplace_back(&*value);
+ }
+ }
+ },
+ values);
+ return entities;
+ }
+
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- });
+ auto entityIds = std::views::transform([](auto && value) {
+ return std::make_pair(std::get<0>(*value), *std::get<1>(*value));
+ });
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
stats.linesParsed++;
- const auto values = crc32ScanValues(result->values());
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
try {
DB::TransactionScope dbtx {*dbconn};
- if (const auto newEnts = newEntities(values); newEnts.front()) {
- existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
- }
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities.insert_range(valuesEntities | entityIds);
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
try {
DB::TransactionScope dbtx {*dbconn};
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- storeEntities(dbconn, {uninsertableLine});
+ auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeNewEntity(dbconn, uninsertableLine);
log(LOG_NOTICE,
"Failed to store parsed line and/or associated entties, but did store raw line, %u:%s",
- std::get<Crc32Value>(uninsertableLine), line.c_str());
+ *std::get<1>(uninsertableLine), line.c_str());
}
catch (const std::exception & excp) {
log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what());
@@ -293,10 +316,10 @@ namespace WebStat {
}
else {
stats.linesParseFailed++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
+ auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeNewEntity(dbconn, unparsableLine);
+ log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *std::get<1>(unparsableLine),
line.c_str());
- storeEntities(dbconn, {unparsableLine});
}
}
}
@@ -426,34 +449,28 @@ namespace WebStat {
return purgedTotal;
}
- template<typename... T>
- Ingestor::NewEntities
- Ingestor::newEntities(const std::tuple<T...> & values) const
+ void
+ Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
- Ingestor::NewEntities rtn;
- auto next = rtn.begin();
- visit(
- [this, &next]<typename X>(const X & entity) {
- auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable {
- if (!existingEntities.contains(std::get<0>(entityToAdd))) {
- *next++ = entityToAdd;
- }
- return 0;
- };
+ for (const auto entity : entities) {
+ if (auto existing = existingEntities.find(std::get<0>(*entity)); existing != existingEntities.end()) {
+ std::get<1>(*entity) = existing->second;
+ }
+ }
+ }
- if constexpr (std::is_same_v<X, Entity>) {
- addNewIfReqd(entity);
- }
- else if constexpr (std::is_same_v<X, std::optional<Entity>>) {
- entity.transform(addNewIfReqd);
- }
- },
- values);
- return rtn;
+ void
+ Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const
+ {
+ for (const auto entity : entities) {
+ if (!std::get<1>(*entity)) {
+ storeNewEntity(dbconn, *entity);
+ }
+ }
}
- Ingestor::NewEntityIds
- Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const
+ void
+ Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const
{
static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9>
ENTITY_TYPE_VALUES {{
@@ -468,28 +485,21 @@ namespace WebStat {
{"content_type", nullptr},
}};
- auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS);
- NewEntityIds ids;
- std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(),
- [this, &insert](auto && entity) {
- const auto & [entityId, type, value] = *entity;
- const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
- bindMany(insert, 0, entityId, typeName, value);
- const auto insertedEntities = insert->execute();
- if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
- std::invoke(onInsert, this, *entity);
- }
- stats.entitiesInserted += insertedEntities;
- return std::get<0>(*entity);
- });
- return ids;
+ auto & [entityHash, entityId, type, value] = entity;
+ assert(!entityId);
+ const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
+ entityId = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, value, typeName);
+ if (onInsert && std::this_thread::get_id() == mainThread) {
+ std::invoke(onInsert, this, entity);
+ }
+ stats.entitiesInserted += 1;
}
void
Ingestor::onNewUserAgent(const Entity & entity) const
{
- const auto & [entityId, type, value] = entity;
- auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str());
+ const auto & [entityHash, entityId, type, value] = entity;
+ auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str());
auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp));
curl_multi_add_handle(curl.get(), added.first->first);
}