diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-20 02:17:04 +0000 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-03-20 02:23:08 +0000 |
| commit | 0f5a0a8e2d43774288d4d6ea747278ca6e085a2a (patch) | |
| tree | 08878dff32a636b388c660fd3330f1bddbd98af3 /src/ingestor.hpp | |
| parent | 8c6fecd356003309f8eebec30374344272ca6072 (diff) | |
| download | webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.tar.bz2 webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.tar.xz webstat-0f5a0a8e2d43774288d4d6ea747278ca6e085a2a.zip | |
Insert log entries in batches
Store log lines in memory until threshold is reach or idle occurs, then
insert all the lines in a single transaction. Save points handle the
case of insertion errors. On success the queue is cleared.
Parked lines also saved in bulk, only necessary if queued lines could
not be inserted on shutdown, else the queue simply grows until ability
to insert is restored. Importing parked lines just adds them to the
queue and the normal process then follows.
Diffstat (limited to 'src/ingestor.hpp')
| -rw-r--r-- | src/ingestor.hpp | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 2ae2936..94e0d5c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -24,6 +24,7 @@ namespace WebStat { std::filesystem::path fallbackDir = "/var/log/webstat"; unsigned int dbMax = 4; unsigned int dbKeep = 2; + size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; minutes freqPurgeOldLogs = 6h; @@ -36,6 +37,7 @@ namespace WebStat { class Ingestor { public: + using LineBatch = std::vector<std::string>; Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -50,9 +52,9 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void ingestLogLine(std::string_view); - void ingestLogLine(DB::Connection *, std::string_view); - void parkLogLine(std::string_view); + void tryIngestQueuedLogLines(); + void ingestLogLines(DB::Connection *, const LineBatch & lines); + void parkQueuedLogLines(); void runJobsAsNeeded(); unsigned int jobIngestParkedLines(); @@ -70,7 +72,8 @@ namespace WebStat { size_t linesParsed = 0; size_t linesDiscarded = 0; size_t linesParked = 0; - mutable std::flat_set<Crc32Value> existingEntities; + std::flat_set<Crc32Value> existingEntities; + LineBatch queuedLines; bool terminated = false; @@ -98,8 +101,8 @@ namespace WebStat { void onNewUserAgent(const Entity &) const; void handleCurlOperations(); - void jobIngestParkedLine(const std::filesystem::directory_iterator &); - void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size); + void jobIngestParkedLines(const std::filesystem::path &); + void jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); |
