#include "ingestor.hpp" #include "sql.hpp" #include "uaLookup.hpp" #include "util.hpp" #include #include #include #include #include #include #include #include #include #include namespace DB { template<> void // NOLINTNEXTLINE(readability-inconsistent-declaration-parameter-name) DB::Command::bindParam(unsigned int idx, const WebStat::Entity & entity) { bindParamI(idx, entity.id); } } namespace WebStat { namespace { using ByteArrayView = std::span; auto bytesToHexRange(const ByteArrayView bytes) { constexpr auto HEXN = 16ZU; return bytes | std::views::transform([](auto byte) { return std::array {byte / HEXN, byte % HEXN}; }) | std::views::join | std::views::transform([](auto nibble) { return "0123456789abcdef"[nibble]; }); } EntityHash makeHash(const std::string_view value) { MD5_CTX ctx {}; MD5Init(&ctx); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - correct for md5ing raw bytes MD5Update(&ctx, reinterpret_cast(value.data()), value.length()); EntityHash hash {}; MD5Final(hash.data(), &ctx); return hash; } template struct ToEntity { Entity operator()(const std::string_view value) const { return { .hash = makeHash(value), .id = std::nullopt, .type = Type, .value = value, }; } template std::optional operator()(const std::optional & value) const { return value.transform([this](auto && contained) { return (*this)(contained); }); } }; auto hashScanValues(const Ingestor::ScanValues & values) { static constexpr std::tuple, std::identity, std::identity, std::identity, ToEntity, ToEntity, std::identity, std::identity, std::identity, std::identity, ToEntity, ToEntity, ToEntity> ENTITY_TYPE_MAP; static constexpr size_t VALUE_COUNT = std::tuple_size_v; static_assert(VALUE_COUNT == std::tuple_size_v); return [&values](std::index_sequence) { return std::make_tuple(std::get(ENTITY_TYPE_MAP)(std::get(values))...); }(std::make_index_sequence()); } template T insert(auto && dbconn, const std::string & sql, const DB::CommandOptionsPtr & opts, Binds &&... binds) { auto ins = dbconn->select(sql, opts); bindMany(ins, 0, std::forward(binds)...); if (ins->fetch()) { T out; (*ins)[0] >> out; return out; } throw DB::NoRowsAffected {}; } } Ingestor * Ingestor::currentIngestor = nullptr; Ingestor::Ingestor(const utsname & host, IngestorSettings givenSettings) : Ingestor {host, std::make_shared( givenSettings.dbMax, givenSettings.dbKeep, givenSettings.dbType, givenSettings.dbConnStr), std::move(givenSettings)} { } Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, ingestParkedLines {&Ingestor::jobIngestParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()}, mainThread {std::this_thread::get_id()} { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); signal(SIGUSR1, &sigusr1Handler); signal(SIGUSR2, &sigusr2Handler); queuedLines.reserve(settings.maxBatchSize); } Ingestor::~Ingestor() { assert(currentIngestor); signal(SIGTERM, SIG_DFL); signal(SIGUSR1, SIG_DFL); signal(SIGUSR2, SIG_DFL); currentIngestor = nullptr; } void Ingestor::sigtermHandler(int sigNo) { assert(currentIngestor); currentIngestor->terminate(sigNo); } void Ingestor::terminate(int sigNo) { log(LOG_NOTICE, "Caught sig %d, terminating", sigNo); terminated = true; curl_multi_wakeup(curl.get()); } void Ingestor::sigusr1Handler(int) { assert(currentIngestor); currentIngestor->logStats(); } void Ingestor::sigusr2Handler(int) { assert(currentIngestor); currentIngestor->clearStats(); } Ingestor::ScanResult Ingestor::scanLogLine(std::string_view input) { return scn::scan< // Field : Apache format specifier : example std::string_view, // virtual_host : %V : some.host.name std::string_view, // remoteip : %a : 1.2.3.4 (or ipv6) uint64_t, // request_time : %{usec}t : 123456790 std::string_view, // method : %m : GET QuotedString, // path : "%u" : "/foo/bar" QueryString, // query_string : "%q" : "?query=string" or "" std::string_view, // protocol : %r : HTTPS/2.0 unsigned short, // status : %>s : 200 unsigned int, // size : %B : 1234 unsigned int, // duration : %D : 1234 CLFString, // referrer : "%{Referer}i" : "https://google.com/whatever" or "-" CLFString, // user_agent : "%{User-agent}i" : "Chromium v123.4" or "-" CLFString // content_type : "%{Content-type}o" : "test/plain" or "-" >(input, R"({} {} {} {:[A-Z]} {} {} {} {} {} {} {} {} {})"); } void Ingestor::handleCurlOperations() { int remaining {}; curl_multi_perform(curl.get(), nullptr); while (auto msg = curl_multi_info_read(curl.get(), &remaining)) { if (msg->msg == CURLMSG_DONE) { if (auto operationItr = curlOperations.find(msg->easy_handle); operationItr != curlOperations.end()) { if (msg->data.result == CURLE_OK) { operationItr->second->whenComplete(dbpool->get().get()); } else { operationItr->second->onError(dbpool->get().get()); } curl_multi_remove_handle(curl.get(), msg->easy_handle); curlOperations.erase(operationItr); } else { curlOperations.erase(msg->easy_handle); log(LOG_WARNING, "Failed to lookup CurlOperation"); } } } } void Ingestor::ingestLog(std::FILE * input) { curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0}; const auto curlTimeOut = static_cast( std::chrono::duration_cast(settings.checkJobsAfter).count()); while (!terminated && curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) { if (logIn.revents) { if (auto line = scn::scan(input, "{:[^\n]}\n")) { stats.linesRead++; queuedLines.emplace_back(std::move(line->value())); if (queuedLines.size() >= settings.maxBatchSize) { tryIngestQueuedLogLines(); } } else { break; } } else { tryIngestQueuedLogLines(); } if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } if (!curlOperations.empty()) { handleCurlOperations(); } } tryIngestQueuedLogLines(); std::ignore = parkQueuedLogLines(); while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } logStats(); } void Ingestor::tryIngestQueuedLogLines() { try { ingestLogLines(dbpool->get().get(), queuedLines); queuedLines.clear(); } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); existingEntities.clear(); } } template std::vector Ingestor::entities(std::tuple & values) { std::vector entities; visit( [&entities](V & value) { static_assert(!std::is_const_v); if constexpr (std::is_same_v) { entities.emplace_back(&value); } else if constexpr (std::is_same_v>) { if (value) { entities.emplace_back(&*value); } } }, values); return entities; } void Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines) { auto entityIds = std::views::transform([](auto && value) { return std::make_pair(value->hash, *value->id); }); DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { stats.linesParsed++; auto values = hashScanValues(result->values()); auto valuesEntities = entities(values); fillKnownEntities(valuesEntities); try { DB::TransactionScope dbtx {*dbconn}; storeNewEntities(dbconn, valuesEntities); existingEntities.insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { try { DB::TransactionScope dbtx {*dbconn}; auto uninsertableLine = ToEntity {}(line); storeNewEntity(dbconn, uninsertableLine); log(LOG_NOTICE, "Failed to store parsed line and/or associated entties, but did store raw line, %u:%s", *uninsertableLine.id, line.c_str()); } catch (const std::exception & excp) { log(LOG_NOTICE, "Failed to store line in any form, DB connection lost? %s", excp.what()); throw originalError; } } } else { stats.linesParseFailed++; auto unparsableLine = ToEntity {}(line); storeNewEntity(dbconn, unparsableLine); log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", *unparsableLine.id, line.c_str()); } } } std::expected Ingestor::parkQueuedLogLines() { if (queuedLines.empty()) { return std::unexpected(0); } const std::filesystem::path path {settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))}; if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { fprintf(parked.get(), "%zu\n", queuedLines.size()); for (const auto & line : queuedLines) { fprintf(parked.get(), "%.*s\n", static_cast(line.length()), line.data()); } if (fflush(parked.get()) == 0) { queuedLines.clear(); auto finalPath = std::filesystem::path {path}.replace_extension(".log"); parked.reset(); if (rename(path.c_str(), finalPath.c_str()) == 0) { return finalPath; } } } const int err = errno; log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size()); for (const auto & line : queuedLines) { log(LOG_ERR, "\t%.*s", static_cast(line.length()), line.data()); } return std::unexpected(err); } void Ingestor::runJobsAsNeeded() { auto runJobAsNeeded = [this, now = Job::LastRunTime::clock::now()](Job & job, auto freq) { if (job.currentRun) { if (job.currentRun->valid()) { try { job.currentRun->get(); job.lastRun = now; } catch (const std::exception & excp) { log(LOG_ERR, "Job run failed: %s", excp.what()); // Error, retry in half the frequency job.lastRun = now - (freq / 2); } job.currentRun.reset(); } } else if (expired(job.lastRun, freq, now)) { job.currentRun.emplace(std::async(job.impl, this)); } }; runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } unsigned int Ingestor::jobIngestParkedLines() { unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { jobIngestParkedLines(pathIter->path()); count += 1; } } return count; } void Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { if (auto count = scn::scan(parked.get(), "{}\n")) { if (jobIngestParkedLines(parked.get(), count->value()) < count->value()) { auto failPath = auto {path}.replace_extension(".short"); rename(path.c_str(), failPath.c_str()); throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } unlink(path.c_str()); return; } } throw std::system_error {errno, std::generic_category(), strerror(errno)}; } size_t Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { LineBatch parkedLines; parkedLines.reserve(count); for (size_t lineNo = 0; lineNo < count; ++lineNo) { if (auto line = scn::scan(lines, "{:[^\n]}\n")) { stats.linesRead++; parkedLines.emplace_back(std::move(line->value())); } else { return lineNo; } } queuedLines.append_range(std::move(parkedLines)); return count; } unsigned int Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, settings.purgeDeleteMax); purge->bindParam(1, std::format("{} days", settings.purgeDaysToKeep)); unsigned int purgedTotal {}; while (stopAt > Job::LastRunTime::clock::now()) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { break; } std::this_thread::sleep_for(settings.purgeDeletePause); } return purgedTotal; } void Ingestor::fillKnownEntities(const std::span entities) const { for (const auto entity : entities) { if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { entity->id = existing->second; } } } void Ingestor::storeNewEntities(DB::Connection * dbconn, const std::span entities) const { for (const auto entity : entities) { if (!entity->id) { storeNewEntity(dbconn, *entity); assert(entity->id); } } } void Ingestor::storeNewEntity(DB::Connection * dbconn, Entity & entity) const { static constexpr std::array, 9> ENTITY_TYPE_VALUES {{ {"host", nullptr}, {"virtual_host", nullptr}, {"path", nullptr}, {"query_string", nullptr}, {"referrer", nullptr}, {"user_agent", &Ingestor::onNewUserAgent}, {"unparsable_line", nullptr}, {"uninsertable_line", nullptr}, {"content_type", nullptr}, }}; assert(!entity.id); const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(entity.type)]; entity.id = insert(dbconn, SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS, entity.value, typeName); if (onInsert && std::this_thread::get_id() == mainThread) { std::invoke(onInsert, this, entity); } stats.entitiesInserted += 1; } void Ingestor::onNewUserAgent(const Entity & entity) const { const auto & [entityHash, entityId, type, value] = entity; auto curlOp = curlGetUserAgentDetail(*entityId, value, settings.userAgentAPI.c_str()); auto added = curlOperations.emplace(curlOp->hnd.get(), std::move(curlOp)); curl_multi_add_handle(curl.get(), added.first->first); } template void Ingestor::storeLogLine(DB::Connection * dbconn, const std::tuple & values) const { auto insert = dbconn->modify(SQL::ACCESS_LOG_INSERT, SQL::ACCESS_LOG_INSERT_OPTS); insert->bindParam(0, hostnameId); std::apply( [&insert](auto &&... value) { unsigned int param = 1; (insert->bindParam(param++, value), ...); }, values); stats.logsInserted += insert->execute(); } void Ingestor::logStats() const { log(LOG_INFO, "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, stats.entitiesInserted, existingEntities.size()); } void Ingestor::clearStats() { stats = {}; } }