diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-25 18:44:15 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-25 18:44:15 +0000 |
| commit | 426a6a3d9309e3f7cf5fc89d6a687c7895785cc3 (patch) | |
| tree | 00daab1e20fbd1d5571fd8d8abd3af1c33f5fcf6 | |
| parent | 6a475f58c0fcb4398e48618d90b0ca05750c3ca8 (diff) | |
| download | webstat-0.3.tar.bz2 webstat-0.3.tar.xz webstat-0.3.zip | |
Revise stats and add signal handlers to log them and reset themwebstat-0.3
Also logs them on main loop exit.
| -rw-r--r-- | src/ingestor.cpp | 50 | ||||
| -rw-r--r-- | src/ingestor.hpp | 18 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 49 |
3 files changed, 94 insertions, 23 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 1b670fb..c5cb8d8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -91,6 +91,8 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + signal(SIGUSR1, &sigusr1Handler); + signal(SIGUSR2, &sigusr2Handler); queuedLines.reserve(settings.maxBatchSize); } @@ -98,6 +100,8 @@ namespace WebStat { { assert(currentIngestor); signal(SIGTERM, SIG_DFL); + signal(SIGUSR1, SIG_DFL); + signal(SIGUSR2, SIG_DFL); currentIngestor = nullptr; } @@ -116,6 +120,20 @@ namespace WebStat { curl_multi_wakeup(curl.get()); } + void + Ingestor::sigusr1Handler(int) + { + assert(currentIngestor); + currentIngestor->logStats(); + } + + void + Ingestor::sigusr2Handler(int) + { + assert(currentIngestor); + currentIngestor->clearStats(); + } + Ingestor::ScanResult Ingestor::scanLogLine(std::string_view input) { @@ -171,7 +189,7 @@ namespace WebStat { while (!terminated && curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { - linesRead++; + stats.linesRead++; queuedLines.emplace_back(std::move(line->value())); if (queuedLines.size() >= settings.maxBatchSize) { tryIngestQueuedLogLines(); @@ -196,6 +214,7 @@ namespace WebStat { while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } + logStats(); } void @@ -222,7 +241,7 @@ namespace WebStat { DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { - linesParsed++; + stats.linesParsed++; const auto values = crc32ScanValues(result->values()); try { DB::TransactionScope dbtx {*dbconn}; @@ -247,7 +266,7 @@ namespace WebStat { } } else { - linesDiscarded++; + stats.linesParseFailed++; const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), line.c_str()); @@ -270,7 +289,6 @@ namespace WebStat { fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); } if (fflush(parked.get()) == 0) { - linesParked += queuedLines.size(); queuedLines.clear(); auto finalPath = auto {path}.replace_extension(".log"); if (rename(path.c_str(), finalPath.c_str()) == 0) { @@ -348,7 +366,7 @@ namespace WebStat { parkedLines.reserve(count); for (size_t lineNo = 0; lineNo < count; ++lineNo) { if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { - linesRead++; + stats.linesRead++; parkedLines.emplace_back(std::move(line->value())); } else { @@ -428,9 +446,11 @@ namespace WebStat { const auto & [entityId, type, value] = *entity; const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; bindMany(insert, 0, entityId, typeName, value); - if (insert->execute() > 0 && onInsert && std::this_thread::get_id() == mainThread) { + const auto insertedEntities = insert->execute(); + if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { std::invoke(onInsert, this, *entity); } + stats.entitiesInserted += insertedEntities; return std::get<0>(*entity); }); return ids; @@ -459,6 +479,22 @@ namespace WebStat { }, values); - insert->execute(); + stats.logsInserted += insert->execute(); + } + + void + Ingestor::logStats() const + { + log(LOG_INFO, + "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " + "entitiesInserted %zu, entitiesKnown %zu", + queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, + stats.entitiesInserted, existingEntities.size()); + } + + void + Ingestor::clearStats() + { + stats = {}; } } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 3f6aa37..fcddc92 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -64,14 +64,20 @@ namespace WebStat { IngestorSettings settings; + struct Stats { + size_t linesRead; + size_t linesParsed; + size_t linesParseFailed; + size_t logsInserted; + size_t entitiesInserted; + constexpr bool operator==(const Ingestor::Stats &) const = default; + }; + protected: static Ingestor * currentIngestor; DB::ConnectionPoolPtr dbpool; + mutable Stats stats {}; - size_t linesRead = 0; - size_t linesParsed = 0; - size_t linesDiscarded = 0; - size_t linesParked = 0; std::flat_set<Crc32Value> existingEntities; LineBatch queuedLines; @@ -100,12 +106,16 @@ namespace WebStat { template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const; void onNewUserAgent(const Entity &) const; void handleCurlOperations(); + void logStats() const; + void clearStats(); void jobIngestParkedLines(const std::filesystem::path &); size_t jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); + static void sigusr1Handler(int); + static void sigusr2Handler(int); [[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0; using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>; diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 9ce48be..8c82764 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -58,6 +58,8 @@ namespace DB { } } +BOOST_TEST_DONT_PRINT_LOG_VALUE(Ingestor::Stats); + BOOST_DATA_TEST_CASE(QuotedStringsGood, boost::unit_test::data::make<WebStat::ParseData<WebStat::QuotedString>>({ {R"("")", ""}, @@ -229,7 +231,10 @@ public: va_end(args); BOOST_REQUIRE(msg); BOOST_TEST_MESSAGE(msg.get()); + ++logsWritten; } + + mutable size_t logsWritten = 0; }; BOOST_FIXTURE_TEST_SUITE(I, TestIngestor); @@ -244,9 +249,11 @@ BOOST_DATA_TEST_CASE(StoreLogLine, line) { ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}}); - BOOST_CHECK_EQUAL(linesRead, 0); - BOOST_CHECK_EQUAL(linesParsed, 1); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(stats.linesRead, 0); + BOOST_CHECK_EQUAL(stats.linesParsed, 1); + BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); + BOOST_CHECK_EQUAL(stats.logsInserted, 1); + BOOST_CHECK_EQUAL(stats.entitiesInserted, 5); BOOST_CHECK_EQUAL(existingEntities.size(), 5); } @@ -256,9 +263,11 @@ BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) WebStat::FilePtr input {fopen(log.path.c_str(), "r")}; BOOST_REQUIRE(input); ingestLog(input.get()); - BOOST_CHECK_EQUAL(linesRead, 10); - BOOST_CHECK_EQUAL(linesParsed, 10); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(stats.linesRead, 10); + BOOST_CHECK_EQUAL(stats.linesParsed, 10); + BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); + BOOST_CHECK_GE(stats.entitiesInserted, 1); + BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities.size()); } BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) @@ -269,9 +278,9 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) raise(SIGTERM); BOOST_REQUIRE(terminated); ingestLog(input.get()); - BOOST_CHECK_EQUAL(linesRead, 0); - BOOST_CHECK_EQUAL(linesParsed, 0); - BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(stats.linesRead, 0); + BOOST_CHECK_EQUAL(stats.linesParsed, 0); + BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); } BOOST_AUTO_TEST_CASE(ParkLogLine) @@ -289,10 +298,9 @@ BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLo { BOOST_REQUIRE(existingEntities.empty()); constexpr std::string_view LOGLINE_BAD_VERB - = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; + = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); - BOOST_CHECK_EQUAL(linesParked, 0); - BOOST_CHECK_EQUAL(linesDiscarded, 1); + BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); } BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine")) @@ -390,6 +398,9 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) }}; auto rows = select->as<Crc32Value, std::string_view>(); BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); + BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); + BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); + BOOST_CHECK(existingEntities.empty()); // Don't clutter existing entities with junk logs } BOOST_AUTO_TEST_CASE(PurgeOldJob) @@ -397,6 +408,20 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob) BOOST_CHECK_EQUAL(2, jobPurgeOldLogs()); } +BOOST_AUTO_TEST_CASE(LogStatsSignal) +{ + BOOST_REQUIRE_EQUAL(logsWritten, 0); + raise(SIGUSR1); + BOOST_CHECK_EQUAL(logsWritten, 1); +} + +BOOST_AUTO_TEST_CASE(LogResetSignal) +{ + stats = {1, 2, 3, 4, 5}; + raise(SIGUSR2); + BOOST_CHECK_EQUAL(stats, Stats {}); +} + BOOST_AUTO_TEST_SUITE_END(); BOOST_AUTO_TEST_CASE(FetchRealUserAgentDetail, *boost::unit_test::disabled()) |
