summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-03-25 18:44:15 +0000
committerDan Goodliffe <dan@randomdan.homeip.net>2026-03-25 18:44:15 +0000
commit426a6a3d9309e3f7cf5fc89d6a687c7895785cc3 (patch)
tree00daab1e20fbd1d5571fd8d8abd3af1c33f5fcf6
parent6a475f58c0fcb4398e48618d90b0ca05750c3ca8 (diff)
downloadwebstat-426a6a3d9309e3f7cf5fc89d6a687c7895785cc3.tar.bz2
webstat-426a6a3d9309e3f7cf5fc89d6a687c7895785cc3.tar.xz
webstat-426a6a3d9309e3f7cf5fc89d6a687c7895785cc3.zip
Revise stats and add signal handlers to log them and reset themwebstat-0.3
Also logs them on main loop exit.
-rw-r--r--src/ingestor.cpp50
-rw-r--r--src/ingestor.hpp18
-rw-r--r--test/test-ingest.cpp49
3 files changed, 94 insertions, 23 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 1b670fb..c5cb8d8 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -91,6 +91,8 @@ namespace WebStat {
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
+ signal(SIGUSR1, &sigusr1Handler);
+ signal(SIGUSR2, &sigusr2Handler);
queuedLines.reserve(settings.maxBatchSize);
}
@@ -98,6 +100,8 @@ namespace WebStat {
{
assert(currentIngestor);
signal(SIGTERM, SIG_DFL);
+ signal(SIGUSR1, SIG_DFL);
+ signal(SIGUSR2, SIG_DFL);
currentIngestor = nullptr;
}
@@ -116,6 +120,20 @@ namespace WebStat {
curl_multi_wakeup(curl.get());
}
+ void
+ Ingestor::sigusr1Handler(int)
+ {
+ assert(currentIngestor);
+ currentIngestor->logStats();
+ }
+
+ void
+ Ingestor::sigusr2Handler(int)
+ {
+ assert(currentIngestor);
+ currentIngestor->clearStats();
+ }
+
Ingestor::ScanResult
Ingestor::scanLogLine(std::string_view input)
{
@@ -171,7 +189,7 @@ namespace WebStat {
while (!terminated && curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
- linesRead++;
+ stats.linesRead++;
queuedLines.emplace_back(std::move(line->value()));
if (queuedLines.size() >= settings.maxBatchSize) {
tryIngestQueuedLogLines();
@@ -196,6 +214,7 @@ namespace WebStat {
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
+ logStats();
}
void
@@ -222,7 +241,7 @@ namespace WebStat {
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
- linesParsed++;
+ stats.linesParsed++;
const auto values = crc32ScanValues(result->values());
try {
DB::TransactionScope dbtx {*dbconn};
@@ -247,7 +266,7 @@ namespace WebStat {
}
}
else {
- linesDiscarded++;
+ stats.linesParseFailed++;
const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
line.c_str());
@@ -270,7 +289,6 @@ namespace WebStat {
fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
}
if (fflush(parked.get()) == 0) {
- linesParked += queuedLines.size();
queuedLines.clear();
auto finalPath = auto {path}.replace_extension(".log");
if (rename(path.c_str(), finalPath.c_str()) == 0) {
@@ -348,7 +366,7 @@ namespace WebStat {
parkedLines.reserve(count);
for (size_t lineNo = 0; lineNo < count; ++lineNo) {
if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
- linesRead++;
+ stats.linesRead++;
parkedLines.emplace_back(std::move(line->value()));
}
else {
@@ -428,9 +446,11 @@ namespace WebStat {
const auto & [entityId, type, value] = *entity;
const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
bindMany(insert, 0, entityId, typeName, value);
- if (insert->execute() > 0 && onInsert && std::this_thread::get_id() == mainThread) {
+ const auto insertedEntities = insert->execute();
+ if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
std::invoke(onInsert, this, *entity);
}
+ stats.entitiesInserted += insertedEntities;
return std::get<0>(*entity);
});
return ids;
@@ -459,6 +479,22 @@ namespace WebStat {
},
values);
- insert->execute();
+ stats.logsInserted += insert->execute();
+ }
+
+ void
+ Ingestor::logStats() const
+ {
+ log(LOG_INFO,
+ "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, "
+ "entitiesInserted %zu, entitiesKnown %zu",
+ queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted,
+ stats.entitiesInserted, existingEntities.size());
+ }
+
+ void
+ Ingestor::clearStats()
+ {
+ stats = {};
}
}
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 3f6aa37..fcddc92 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -64,14 +64,20 @@ namespace WebStat {
IngestorSettings settings;
+ struct Stats {
+ size_t linesRead;
+ size_t linesParsed;
+ size_t linesParseFailed;
+ size_t logsInserted;
+ size_t entitiesInserted;
+ constexpr bool operator==(const Ingestor::Stats &) const = default;
+ };
+
protected:
static Ingestor * currentIngestor;
DB::ConnectionPoolPtr dbpool;
+ mutable Stats stats {};
- size_t linesRead = 0;
- size_t linesParsed = 0;
- size_t linesDiscarded = 0;
- size_t linesParked = 0;
std::flat_set<Crc32Value> existingEntities;
LineBatch queuedLines;
@@ -100,12 +106,16 @@ namespace WebStat {
template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const;
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
+ void logStats() const;
+ void clearStats();
void jobIngestParkedLines(const std::filesystem::path &);
size_t jobIngestParkedLines(FILE *, size_t count);
static void sigtermHandler(int);
void terminate(int);
+ static void sigusr1Handler(int);
+ static void sigusr2Handler(int);
[[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0;
using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>;
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index 9ce48be..8c82764 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -58,6 +58,8 @@ namespace DB {
}
}
+BOOST_TEST_DONT_PRINT_LOG_VALUE(Ingestor::Stats);
+
BOOST_DATA_TEST_CASE(QuotedStringsGood,
boost::unit_test::data::make<WebStat::ParseData<WebStat::QuotedString>>({
{R"("")", ""},
@@ -229,7 +231,10 @@ public:
va_end(args);
BOOST_REQUIRE(msg);
BOOST_TEST_MESSAGE(msg.get());
+ ++logsWritten;
}
+
+ mutable size_t logsWritten = 0;
};
BOOST_FIXTURE_TEST_SUITE(I, TestIngestor);
@@ -244,9 +249,11 @@ BOOST_DATA_TEST_CASE(StoreLogLine,
line)
{
ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}});
- BOOST_CHECK_EQUAL(linesRead, 0);
- BOOST_CHECK_EQUAL(linesParsed, 1);
- BOOST_CHECK_EQUAL(linesDiscarded, 0);
+ BOOST_CHECK_EQUAL(stats.linesRead, 0);
+ BOOST_CHECK_EQUAL(stats.linesParsed, 1);
+ BOOST_CHECK_EQUAL(stats.linesParseFailed, 0);
+ BOOST_CHECK_EQUAL(stats.logsInserted, 1);
+ BOOST_CHECK_EQUAL(stats.entitiesInserted, 5);
BOOST_CHECK_EQUAL(existingEntities.size(), 5);
}
@@ -256,9 +263,11 @@ BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine"))
WebStat::FilePtr input {fopen(log.path.c_str(), "r")};
BOOST_REQUIRE(input);
ingestLog(input.get());
- BOOST_CHECK_EQUAL(linesRead, 10);
- BOOST_CHECK_EQUAL(linesParsed, 10);
- BOOST_CHECK_EQUAL(linesDiscarded, 0);
+ BOOST_CHECK_EQUAL(stats.linesRead, 10);
+ BOOST_CHECK_EQUAL(stats.linesParsed, 10);
+ BOOST_CHECK_EQUAL(stats.linesParseFailed, 0);
+ BOOST_CHECK_GE(stats.entitiesInserted, 1);
+ BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities.size());
}
BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5))
@@ -269,9 +278,9 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5))
raise(SIGTERM);
BOOST_REQUIRE(terminated);
ingestLog(input.get());
- BOOST_CHECK_EQUAL(linesRead, 0);
- BOOST_CHECK_EQUAL(linesParsed, 0);
- BOOST_CHECK_EQUAL(linesDiscarded, 0);
+ BOOST_CHECK_EQUAL(stats.linesRead, 0);
+ BOOST_CHECK_EQUAL(stats.linesParsed, 0);
+ BOOST_CHECK_EQUAL(stats.linesParseFailed, 0);
}
BOOST_AUTO_TEST_CASE(ParkLogLine)
@@ -289,10 +298,9 @@ BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLo
{
BOOST_REQUIRE(existingEntities.empty());
constexpr std::string_view LOGLINE_BAD_VERB
- = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG";
+ = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG";
BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}}));
- BOOST_CHECK_EQUAL(linesParked, 0);
- BOOST_CHECK_EQUAL(linesDiscarded, 1);
+ BOOST_CHECK_EQUAL(stats.linesParseFailed, 1);
}
BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine"))
@@ -390,6 +398,9 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable)
}};
auto rows = select->as<Crc32Value, std::string_view>();
BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end());
+ BOOST_CHECK_EQUAL(stats.linesParseFailed, 1);
+ BOOST_CHECK_EQUAL(stats.entitiesInserted, 1);
+ BOOST_CHECK(existingEntities.empty()); // Don't clutter existing entities with junk logs
}
BOOST_AUTO_TEST_CASE(PurgeOldJob)
@@ -397,6 +408,20 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob)
BOOST_CHECK_EQUAL(2, jobPurgeOldLogs());
}
+BOOST_AUTO_TEST_CASE(LogStatsSignal)
+{
+ BOOST_REQUIRE_EQUAL(logsWritten, 0);
+ raise(SIGUSR1);
+ BOOST_CHECK_EQUAL(logsWritten, 1);
+}
+
+BOOST_AUTO_TEST_CASE(LogResetSignal)
+{
+ stats = {1, 2, 3, 4, 5};
+ raise(SIGUSR2);
+ BOOST_CHECK_EQUAL(stats, Stats {});
+}
+
BOOST_AUTO_TEST_SUITE_END();
BOOST_AUTO_TEST_CASE(FetchRealUserAgentDetail, *boost::unit_test::disabled())