summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp11
-rw-r--r--src/ingestor.hpp4
-rw-r--r--src/webstat_logger_main.cpp4
3 files changed, 14 insertions, 5 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index c41454f..04216f8 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -253,14 +253,19 @@ namespace WebStat {
void
Ingestor::tryIngestQueuedLogLines()
{
+ auto storedEnd = queuedLines.begin();
try {
- ingestLogLines(dbpool->get().get(), queuedLines);
- queuedLines.clear();
+ for (auto batch :
+ queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) {
+ ingestLogLines(dbpool->get().get(), batch);
+ storedEnd = batch.end();
+ }
}
catch (const std::exception & excp) {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
existingEntities.clear();
}
+ queuedLines.erase(queuedLines.begin(), storedEnd);
}
template<typename... T>
@@ -285,7 +290,7 @@ namespace WebStat {
}
void
- Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
+ Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines)
{
auto entityIds = std::views::transform([](auto && value) {
return std::make_pair(value->hash, *value->id);
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 195f325..916f200 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -26,6 +26,7 @@ namespace WebStat {
unsigned int dbMax = 4;
unsigned int dbKeep = 2;
size_t maxBatchSize = 1;
+ size_t maxBatches = 5;
minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
minutes freqPurgeOldLogs = 6h;
@@ -39,6 +40,7 @@ namespace WebStat {
class Ingestor {
public:
using LineBatch = std::vector<std::string>;
+ using LinesView = std::span<const std::string>;
Ingestor(const utsname &, IngestorSettings);
Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings);
@@ -54,7 +56,7 @@ namespace WebStat {
void ingestLog(std::FILE *);
void tryIngestQueuedLogLines();
- void ingestLogLines(DB::Connection *, const LineBatch & lines);
+ void ingestLogLines(DB::Connection *, LinesView lines);
std::expected<std::filesystem::path, int> parkQueuedLogLines();
void runJobsAsNeeded();
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 3eaa9f0..04d59fc 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -68,7 +68,9 @@ main(int argc, char ** argv)
("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep),
"Number of write/read write DB connections to keep open")
("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize),
- "Maximum number of access log entries to hold in memory before writing them to the DB")
+ "Number of access log entries to hold in memory before starting to write them to the DB")
+ ("db.maxBatches", po::value(&settings.maxBatches)->default_value(settings.maxBatches),
+ "Maximum number of batches to insert before returning to reading input")
("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir),
"Path to write access logs to when the database is unavailable")
("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter),