summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ingestor.cpp131
-rw-r--r--src/ingestor.hpp15
-rw-r--r--src/webstat_logger_main.cpp2
3 files changed, 88 insertions, 60 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index e2018ad..1de9ab9 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -5,7 +5,6 @@
#include <connection.h>
#include <csignal>
#include <dbTypes.h>
-#include <fstream>
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
@@ -92,6 +91,7 @@ namespace WebStat {
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
+ queuedLines.reserve(settings.maxBatchSize);
}
Ingestor::~Ingestor()
@@ -171,12 +171,18 @@ namespace WebStat {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
- ingestLogLine(line->value());
+ queuedLines.emplace_back(std::move(line->value()));
+ if (queuedLines.size() >= settings.maxBatchSize) {
+ tryIngestQueuedLogLines();
+ }
}
else {
break;
}
}
+ else {
+ tryIngestQueuedLogLines();
+ }
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
}
@@ -184,68 +190,84 @@ namespace WebStat {
handleCurlOperations();
}
}
+ tryIngestQueuedLogLines();
+ parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
}
void
- Ingestor::ingestLogLine(const std::string_view line)
+ Ingestor::tryIngestQueuedLogLines()
{
try {
- ingestLogLine(dbpool->get().get(), line);
+ ingestLogLines(dbpool->get().get(), queuedLines);
+ queuedLines.clear();
}
catch (const std::exception &) {
- parkLogLine(line);
+ existingEntities.clear();
}
}
void
- Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line)
+ Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto rememberNewEntityIds = [this](const auto & ids) {
- existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- }));
- };
- if (auto result = scanLogLine(line)) {
- linesParsed++;
- const auto values = crc32ScanValues(result->values());
- NewEntityIds ids;
- try {
- {
- std::optional<DB::TransactionScope> dbtx;
+ auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
+ | std::views::transform([](auto && value) {
+ return *value;
+ });
+
+ DB::TransactionScope batchTx {*dbconn};
+ for (const auto & line : lines) {
+ if (auto result = scanLogLine(line)) {
+ linesParsed++;
+ const auto values = crc32ScanValues(result->values());
+ try {
+ DB::TransactionScope dbtx {*dbconn};
if (const auto newEnts = newEntities(values); newEnts.front()) {
- dbtx.emplace(*dbconn);
- ids = storeEntities(dbconn, newEnts);
+ existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
}
storeLogLine(dbconn, values);
}
- rememberNewEntityIds(ids);
- }
- catch (const DB::Error & originalError) {
- try {
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine}));
- }
- catch (const std::exception &) {
- throw originalError;
+ catch (const DB::Error & originalError) {
+ try {
+ DB::TransactionScope dbtx {*dbconn};
+ const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeEntities(dbconn, {uninsertableLine});
+ }
+ catch (const std::exception &) {
+ throw originalError;
+ }
}
}
- }
- else {
- linesDiscarded++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {unparsableLine}));
+ else {
+ linesDiscarded++;
+ const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeEntities(dbconn, {unparsableLine});
+ }
}
}
void
- Ingestor::parkLogLine(std::string_view line)
+ Ingestor::parkQueuedLogLines()
{
- std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line;
- linesParked++;
+ if (queuedLines.empty()) {
+ return;
+ }
+ std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))};
+ if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
+ fprintf(parked.get(), "%zu\n", queuedLines.size());
+ for (const auto & line : queuedLines) {
+ fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
+ }
+ if (fflush(parked.get()) == 0) {
+ linesParked += queuedLines.size();
+ queuedLines.clear();
+ }
+ else {
+ std::filesystem::remove(path);
+ }
+ }
}
void
@@ -280,7 +302,7 @@ namespace WebStat {
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) {
- jobIngestParkedLine(pathIter);
+ jobIngestParkedLines(pathIter->path());
count += 1;
}
}
@@ -288,29 +310,30 @@ namespace WebStat {
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter)
+ Ingestor::jobIngestParkedLines(const std::filesystem::path & path)
{
- jobIngestParkedLine(pathIter->path(), pathIter->file_size());
+ if (auto parked = FilePtr(fopen(path.c_str(), "r"))) {
+ if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) {
+ jobIngestParkedLines(parked.get(), count->value());
+ std::filesystem::remove(path);
+ return;
+ }
+ }
+ throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size)
+ Ingestor::jobIngestParkedLines(FILE * lines, size_t count)
{
- if (std::ifstream parked {path}) {
- std::string line;
- line.resize_and_overwrite(size, [&parked](char * content, size_t size) {
- parked.read(content, static_cast<std::streamsize>(size));
- return static_cast<size_t>(parked.tellg());
- });
- if (line.length() < size) {
+ for (size_t line = 0; line < count; ++line) {
+ if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
+ linesRead++;
+ queuedLines.emplace_back(std::move(line->value()));
+ }
+ else {
throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
}
- ingestLogLine(dbpool->get().get(), line);
- }
- else {
- throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
- std::filesystem::remove(path);
}
unsigned int
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 2ae2936..94e0d5c 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -24,6 +24,7 @@ namespace WebStat {
std::filesystem::path fallbackDir = "/var/log/webstat";
unsigned int dbMax = 4;
unsigned int dbKeep = 2;
+ size_t maxBatchSize = 1;
minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
minutes freqPurgeOldLogs = 6h;
@@ -36,6 +37,7 @@ namespace WebStat {
class Ingestor {
public:
+ using LineBatch = std::vector<std::string>;
Ingestor(const utsname &, IngestorSettings);
Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings);
@@ -50,9 +52,9 @@ namespace WebStat {
[[nodiscard]] static ScanResult scanLogLine(std::string_view);
void ingestLog(std::FILE *);
- void ingestLogLine(std::string_view);
- void ingestLogLine(DB::Connection *, std::string_view);
- void parkLogLine(std::string_view);
+ void tryIngestQueuedLogLines();
+ void ingestLogLines(DB::Connection *, const LineBatch & lines);
+ void parkQueuedLogLines();
void runJobsAsNeeded();
unsigned int jobIngestParkedLines();
@@ -70,7 +72,8 @@ namespace WebStat {
size_t linesParsed = 0;
size_t linesDiscarded = 0;
size_t linesParked = 0;
- mutable std::flat_set<Crc32Value> existingEntities;
+ std::flat_set<Crc32Value> existingEntities;
+ LineBatch queuedLines;
bool terminated = false;
@@ -98,8 +101,8 @@ namespace WebStat {
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
- void jobIngestParkedLine(const std::filesystem::directory_iterator &);
- void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size);
+ void jobIngestParkedLines(const std::filesystem::path &);
+ void jobIngestParkedLines(FILE *, size_t count);
static void sigtermHandler(int);
void terminate(int);
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 6d3aeda..39d794a 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -53,6 +53,8 @@ main(int argc, char ** argv)
"Maximum number of concurrent write/read write DB connections")
("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep),
"Number of write/read write DB connections to keep open")
+ ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize),
+ "Maximum number of access log entries to hold in memory before writing them to the DB")
("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir),
"Path to write access logs to when the database is unavailable")
("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter),