summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp204
1 files changed, 125 insertions, 79 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index c5cb8d8..33af8cf 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -8,6 +8,7 @@
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
+#include <selectcommand.h>
#include <syslog.h>
#include <utility>
#include <zlib.h>
@@ -18,25 +19,48 @@ namespace DB {
// NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name)
DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity)
{
- bindParamI(idx, std::get<0>(entity));
+ bindParamI(idx, entity.id);
}
}
namespace WebStat {
namespace {
- Crc32Value
- crc32(const std::string_view value)
+ using ByteArrayView = std::span<const uint8_t>;
+
+ auto
+ bytesToHexRange(const ByteArrayView bytes)
+ {
+ constexpr auto HEXN = 16ZU;
+ return bytes | std::views::transform([](auto byte) {
+ return std::array {byte / HEXN, byte % HEXN};
+ }) | std::views::join
+ | std::views::transform([](auto nibble) {
+ return "0123456789abcdef"[nibble];
+ });
+ }
+
+ EntityHash
+ makeHash(const std::string_view value)
{
- // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for crc32ing raw bytes
- return static_cast<Crc32Value>(::crc32(::crc32(0, Z_NULL, 0), reinterpret_cast<const Bytef *>(value.data()),
- static_cast<uInt>(value.length())));
+ MD5_CTX ctx {};
+ MD5Init(&ctx);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes
+ MD5Update(&ctx, reinterpret_cast<const uint8_t *>(value.data()), value.length());
+ EntityHash hash {};
+ MD5Final(hash.data(), &ctx);
+ return hash;
}
template<EntityType Type> struct ToEntity {
Entity
operator()(const std::string_view value) const
{
- return {crc32(value), Type, value};
+ return {
+ .hash = makeHash(value),
+ .id = std::nullopt,
+ .type = Type,
+ .value = value,
+ };
}
template<typename T>
@@ -50,7 +74,7 @@ namespace WebStat {
};
auto
- crc32ScanValues(const Ingestor::ScanValues & values)
+ hashScanValues(const Ingestor::ScanValues & values)
{
static constexpr std::tuple<ToEntity<EntityType::VirtualHost>, std::identity, std::identity, std::identity,
ToEntity<EntityType::Path>, ToEntity<EntityType::QueryString>, std::identity, std::identity,
@@ -65,6 +89,19 @@ namespace WebStat {
}(std::make_index_sequence<VALUE_COUNT>());
}
+ template<std::integral T = EntityId, typename... Binds>
+ T
+ insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds)
+ {
+ auto ins = dbconn->select(sql, opts);
+ bindMany(ins, 0, std::forward<Binds>(binds)...);
+ if (ins->fetch()) {
+ T out;
+ (*ins)[0] >> out;
+ return out;
+ }
+ throw DB::NoRowsAffected {};
+ }
}
Ingestor * Ingestor::currentIngestor = nullptr;
@@ -80,14 +117,10 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- hostnameId {crc32(host.nodename)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
+ hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
+ host.release, host.version, host.machine, host.domainname)},
+ curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
{
- auto dbconn = dbpool->get();
- auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS);
- bindMany(ins, 0, hostnameId, host.nodename, host.sysname, host.release, host.version, host.machine,
- host.domainname);
- ins->execute();
-
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
@@ -210,7 +243,7 @@ namespace WebStat {
}
}
tryIngestQueuedLogLines();
- parkQueuedLogLines();
+ std::ignore = parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
@@ -230,34 +263,58 @@ namespace WebStat {
}
}
+ template<typename... T>
+ std::vector<Entity *>
+ Ingestor::entities(std::tuple<T...> & values)
+ {
+ std::vector<Entity *> entities;
+ visit(
+ [&entities]<typename V>(V & value) {
+ static_assert(!std::is_const_v<V>);
+ if constexpr (std::is_same_v<V, Entity>) {
+ entities.emplace_back(&value);
+ }
+ else if constexpr (std::is_same_v<V, std::optional<Entity>>) {
+ if (value) {
+ entities.emplace_back(&*value);
+ }
+ }
+ },
+ values);
+ return entities;
+ }
+
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- });
+ auto entityIds = std::views::transform([](auto && value) {
+ return std::make_pair(value->hash, *value->id);
+ });
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
stats.linesParsed++;
- const auto values = crc32ScanValues(result->values());
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
try {
- DB::TransactionScope dbtx {*dbconn};
- if (const auto newEnts = newEntities(values); newEnts.front()) {
- existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
+ std::optional<DB::TransactionScope> lineTx;
+ if (!std::ranges::all_of(valuesEntities, &std::optional<EntityId>::has_value, &Entity::id)) {
+ lineTx.emplace(*dbconn);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities.insert_range(valuesEntities | entityIds);
}
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
try {
DB::TransactionScope dbtx {*dbconn};
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- storeEntities(dbconn, {uninsertableLine});
+ auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeNewEntity(dbconn, uninsertableLine);
log(LOG_NOTICE,
"Failed to store parsed line and/or associated entties, but did store raw line, %u:%s",
- std::get<Crc32Value>(uninsertableLine), line.c_str());
+ *uninsertableLine.id, line.c_str());
}
catch (const std::exception & excp) {
log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what());
@@ -267,22 +324,21 @@ namespace WebStat {
}
else {
stats.linesParseFailed++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
- line.c_str());
- storeEntities(dbconn, {unparsableLine});
+ auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeNewEntity(dbconn, unparsableLine);
+ log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str());
}
}
}
- void
+ std::expected<std::filesystem::path, int>
Ingestor::parkQueuedLogLines()
{
if (queuedLines.empty()) {
- return;
+ return std::unexpected(0);
}
- const std::filesystem::path path {
- settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))};
+ const std::filesystem::path path {settings.fallbackDir
+ / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))};
if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
fprintf(parked.get(), "%zu\n", queuedLines.size());
for (const auto & line : queuedLines) {
@@ -290,16 +346,19 @@ namespace WebStat {
}
if (fflush(parked.get()) == 0) {
queuedLines.clear();
- auto finalPath = auto {path}.replace_extension(".log");
+ auto finalPath = std::filesystem::path {path}.replace_extension(".log");
+ parked.reset();
if (rename(path.c_str(), finalPath.c_str()) == 0) {
- return;
+ return finalPath;
}
}
}
+ const int err = errno;
log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size());
for (const auto & line : queuedLines) {
log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data());
}
+ return std::unexpected(err);
}
void
@@ -334,7 +393,7 @@ namespace WebStat {
unsigned int count = 0;
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
- if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) {
+ if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) {
jobIngestParkedLines(pathIter->path());
count += 1;
}
@@ -397,34 +456,29 @@ namespace WebStat {
return purgedTotal;
}
- template<typename... T>
- Ingestor::NewEntities
- Ingestor::newEntities(const std::tuple<T...> & values) const
+ void
+ Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
- Ingestor::NewEntities rtn;
- auto next = rtn.begin();
- visit(
- [this, &next]<typename X>(const X & entity) {
- auto addNewIfReqd = [&next, this](auto && entityToAdd) mutable {
- if (!existingEntities.contains(std::get<0>(entityToAdd))) {
- *next++ = entityToAdd;
- }
- return 0;
- };
+ for (const auto entity : entities) {
+ if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) {
+ entity->id = existing->second;
+ }
+ }
+ }
- if constexpr (std::is_same_v<X, Entity>) {
- addNewIfReqd(entity);
- }
- else if constexpr (std::is_same_v<X, std::optional<Entity>>) {
- entity.transform(addNewIfReqd);
- }
- },
- values);
- return rtn;
+ void
+ Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span<Entity *> entities) const
+ {
+ for (const auto entity : entities) {
+ if (!entity->id) {
+ storeNewEntity(dbconn, *entity);
+ assert(entity->id);
+ }
+ }
}
- Ingestor::NewEntityIds
- Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const
+ void
+ Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const
{
static constexpr std::array<std::pair<std::string_view, void (Ingestor::*)(const Entity &) const>, 9>
ENTITY_TYPE_VALUES {{
@@ -439,28 +493,20 @@ namespace WebStat {
{"content_type", nullptr},
}};
- auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS);
- NewEntityIds ids;
- std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(),
- [this, &insert](auto && entity) {
- const auto & [entityId, type, value] = *entity;
- const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
- bindMany(insert, 0, entityId, typeName, value);
- const auto insertedEntities = insert->execute();
- if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
- std::invoke(onInsert, this, *entity);
- }
- stats.entitiesInserted += insertedEntities;
- return std::get<0>(*entity);
- });
- return ids;
+ assert(!entity.id);
+ const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)];
+ entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName);
+ if (onInsert && std::this_thread::get_id() == mainThread) {
+ std::invoke(onInsert, this, entity);
+ }
+ stats.entitiesInserted += 1;
}
void
Ingestor::onNewUserAgent(const Entity & entity) const
{
- const auto & [entityId, type, value] = entity;
- auto curlOp = curlGetUserAgentDetail(entityId, value, settings.userAgentAPI.c_str());
+ const auto & [entityHash, entityId, type, value] = entity;
+ auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str());
auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp));
curl_multi_add_handle(curl.get(), added.first->first);
}