summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ingestor.cpp50
-rw-r--r--src/ingestor.hpp18
2 files changed, 57 insertions, 11 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 1b670fb..c5cb8d8 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -91,6 +91,8 @@ namespace WebStat {
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
+ signal(SIGUSR1, &sigusr1Handler);
+ signal(SIGUSR2, &sigusr2Handler);
queuedLines.reserve(settings.maxBatchSize);
}
@@ -98,6 +100,8 @@ namespace WebStat {
{
assert(currentIngestor);
signal(SIGTERM, SIG_DFL);
+ signal(SIGUSR1, SIG_DFL);
+ signal(SIGUSR2, SIG_DFL);
currentIngestor = nullptr;
}
@@ -116,6 +120,20 @@ namespace WebStat {
curl_multi_wakeup(curl.get());
}
+ void
+ Ingestor::sigusr1Handler(int)
+ {
+ assert(currentIngestor);
+ currentIngestor->logStats();
+ }
+
+ void
+ Ingestor::sigusr2Handler(int)
+ {
+ assert(currentIngestor);
+ currentIngestor->clearStats();
+ }
+
Ingestor::ScanResult
Ingestor::scanLogLine(std::string_view input)
{
@@ -171,7 +189,7 @@ namespace WebStat {
while (!terminated && curl_multi_poll(curl.get(), &logIn, 1, curlTimeOut, nullptr) == CURLM_OK) {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
- linesRead++;
+ stats.linesRead++;
queuedLines.emplace_back(std::move(line->value()));
if (queuedLines.size() >= settings.maxBatchSize) {
tryIngestQueuedLogLines();
@@ -196,6 +214,7 @@ namespace WebStat {
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
+ logStats();
}
void
@@ -222,7 +241,7 @@ namespace WebStat {
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
- linesParsed++;
+ stats.linesParsed++;
const auto values = crc32ScanValues(result->values());
try {
DB::TransactionScope dbtx {*dbconn};
@@ -247,7 +266,7 @@ namespace WebStat {
}
}
else {
- linesDiscarded++;
+ stats.linesParseFailed++;
const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
log(LOG_NOTICE, "Failed to parse line, this is a bug: %u:%s", std::get<Crc32Value>(unparsableLine),
line.c_str());
@@ -270,7 +289,6 @@ namespace WebStat {
fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
}
if (fflush(parked.get()) == 0) {
- linesParked += queuedLines.size();
queuedLines.clear();
auto finalPath = auto {path}.replace_extension(".log");
if (rename(path.c_str(), finalPath.c_str()) == 0) {
@@ -348,7 +366,7 @@ namespace WebStat {
parkedLines.reserve(count);
for (size_t lineNo = 0; lineNo < count; ++lineNo) {
if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
- linesRead++;
+ stats.linesRead++;
parkedLines.emplace_back(std::move(line->value()));
}
else {
@@ -428,9 +446,11 @@ namespace WebStat {
const auto & [entityId, type, value] = *entity;
const auto & [typeName, onInsert] = ENTITY_TYPE_VALUES[std::to_underlying(type)];
bindMany(insert, 0, entityId, typeName, value);
- if (insert->execute() > 0 && onInsert && std::this_thread::get_id() == mainThread) {
+ const auto insertedEntities = insert->execute();
+ if (insertedEntities > 0 && onInsert && std::this_thread::get_id() == mainThread) {
std::invoke(onInsert, this, *entity);
}
+ stats.entitiesInserted += insertedEntities;
return std::get<0>(*entity);
});
return ids;
@@ -459,6 +479,22 @@ namespace WebStat {
},
values);
- insert->execute();
+ stats.logsInserted += insert->execute();
+ }
+
+ void
+ Ingestor::logStats() const
+ {
+ log(LOG_INFO,
+ "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, "
+ "entitiesInserted %zu, entitiesKnown %zu",
+ queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted,
+ stats.entitiesInserted, existingEntities.size());
+ }
+
+ void
+ Ingestor::clearStats()
+ {
+ stats = {};
}
}
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 3f6aa37..fcddc92 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -64,14 +64,20 @@ namespace WebStat {
IngestorSettings settings;
+ struct Stats {
+ size_t linesRead;
+ size_t linesParsed;
+ size_t linesParseFailed;
+ size_t logsInserted;
+ size_t entitiesInserted;
+ constexpr bool operator==(const Ingestor::Stats &) const = default;
+ };
+
protected:
static Ingestor * currentIngestor;
DB::ConnectionPoolPtr dbpool;
+ mutable Stats stats {};
- size_t linesRead = 0;
- size_t linesParsed = 0;
- size_t linesDiscarded = 0;
- size_t linesParked = 0;
std::flat_set<Crc32Value> existingEntities;
LineBatch queuedLines;
@@ -100,12 +106,16 @@ namespace WebStat {
template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const;
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
+ void logStats() const;
+ void clearStats();
void jobIngestParkedLines(const std::filesystem::path &);
size_t jobIngestParkedLines(FILE *, size_t count);
static void sigtermHandler(int);
void terminate(int);
+ static void sigusr1Handler(int);
+ static void sigusr2Handler(int);
[[gnu::format(printf, 3, 4)]] virtual void log(int level, const char * msgfmt, ...) const = 0;
using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>;