diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 50 |
1 files changed, 43 insertions, 7 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 1b670fb..c5cb8d8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -91,6 +91,8 @@ namespace WebStat { assert(!currentIngestor); currentIngestor = this; signal(SIGTERM, &sigtermHandler); + signal(SIGUSR1, &sigusr1Handler); + signal(SIGUSR2, &sigusr2Handler); queuedLines.reserve(settings.maxBatchSize); } @@ -98,6 +100,8 @@ namespace WebStat { { assert(currentIngestor); signal(SIGTERM, SIG_DFL); + signal(SIGUSR1, SIG_DFL); + signal(SIGUSR2, SIG_DFL); currentIngestor = nullptr; } @@ -116,6 +120,20 @@ namespace WebStat { curl_multi_wakeup(curl.get()); } + void + Ingestor::sigusr1Handler(int) + { + assert(currentIngestor); + currentIngestor->logStats(); + } + + void + Ingestor::sigusr2Handler(int) + { + assert(currentIngestor); + currentIngestor->clearStats(); + } + Ingestor::ScanResult Ingestor::scanLogLine(std::string_view input) { @@ -171,7 +189,7 @@ namespace WebStat { while (!terminated && curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) { if (logIn.revents) { if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { - linesRead++; + stats.linesRead++; queuedLines.emplace_back(std::move(line->value())); if (queuedLines.size() >= settings.maxBatchSize) { tryIngestQueuedLogLines(); @@ -196,6 +214,7 @@ namespace WebStat { while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { handleCurlOperations(); } + logStats(); } void @@ -222,7 +241,7 @@ namespace WebStat { DB::TransactionScope batchTx {*dbconn}; for (const auto & line : lines) { if (auto result = scanLogLine(line)) { - linesParsed++; + stats.linesParsed++; const auto values = crc32ScanValues(result->values()); try { DB::TransactionScope dbtx {*dbconn}; @@ -247,7 +266,7 @@ namespace WebStat { } } else { - linesDiscarded++; + stats.linesParseFailed++; const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line); log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine), line.c_str()); @@ -270,7 +289,6 @@ namespace WebStat { fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); } if (fflush(parked.get()) == 0) { - linesParked += queuedLines.size(); queuedLines.clear(); auto finalPath = auto {path}.replace_extension(".log"); if (rename(path.c_str(), finalPath.c_str()) == 0) { @@ -348,7 +366,7 @@ namespace WebStat { parkedLines.reserve(count); for (size_t lineNo = 0; lineNo < count; ++lineNo) { if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) { - linesRead++; + stats.linesRead++; parkedLines.emplace_back(std::move(line->value())); } else { @@ -428,9 +446,11 @@ namespace WebStat { const auto & [entityId, type, value] = *entity; const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)]; bindMany(insert, 0, entityId, typeName, value); - if (insert->execute() > 0 && onInsert && std::this_thread::get_id() == mainThread) { + const auto insertedEntities = insert->execute(); + if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) { std::invoke(onInsert, this, *entity); } + stats.entitiesInserted += insertedEntities; return std::get<0>(*entity); }); return ids; @@ -459,6 +479,22 @@ namespace WebStat { }, values); - insert->execute(); + stats.logsInserted += insert->execute(); + } + + void + Ingestor::logStats() const + { + log(LOG_INFO, + "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " + "entitiesInserted %zu, entitiesKnown %zu", + queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, + stats.entitiesInserted, existingEntities.size()); + } + + void + Ingestor::clearStats() + { + stats = {}; } } |
