diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-02 18:58:21 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-02 18:58:21 +0100 |
| commit | 4675ab65ea5e807e0d457845a0ca84edcf1262c9 (patch) | |
| tree | c8e0adae9cfefe6643f03b797283b3012e07b2d3 /src/ingestor.hpp | |
| parent | 5c0206f48dc7f90009629d0a74bdc1dd6b4f67ea (diff) | |
| download | webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.bz2 webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.tar.xz webstat-4675ab65ea5e807e0d457845a0ca84edcf1262c9.zip | |
Ingest log lines in a background thread
This prevents halting reading input during data insertion.
Diffstat (limited to 'src/ingestor.hpp')
| -rw-r--r-- | src/ingestor.hpp | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/src/ingestor.hpp b/src/ingestor.hpp index d989ecd..64b3357 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -41,6 +41,19 @@ namespace WebStat { public: using LineBatch = std::vector<std::string>; using LinesView = std::span<const std::string>; + + struct Job { + using LastRunTime = std::chrono::system_clock::time_point; + using Result = std::function<unsigned int()>; + using Impl = Result (Ingestor::*)(); + + explicit Job(Impl jobImpl) : impl(jobImpl) { } + + const Impl impl; + LastRunTime lastRun {LastRunTime::clock::now()}; + std::optional<std::future<Result>> currentRun; + }; + Ingestor(const utsname &, IngestorSettings); Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings); @@ -55,25 +68,14 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void tryIngestQueuedLogLines(); + std::pair<std::future<Job::Result> &, bool> beginIngestQueuedLogLines(); void ingestLogLines(DB::Connection *, LinesView lines); - std::expected<std::filesystem::path, int> parkQueuedLogLines(); + std::expected<std::filesystem::path, int> parkLogLines(LineBatch &); void runJobsAsNeeded(); - struct Job { - using LastRunTime = std::chrono::system_clock::time_point; - using Result = std::function<unsigned int()>; - using Impl = Result (Ingestor::*)(); - - explicit Job(Impl jobImpl) : impl(jobImpl) { } - - const Impl impl; - LastRunTime lastRun {LastRunTime::clock::now()}; - std::optional<std::future<Result>> currentRun; - }; - Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); + Job::Result jobStoreQueuedLines(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -89,18 +91,21 @@ namespace WebStat { }; protected: + void finishAllJobs(); + static Ingestor * currentIngestor; DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; std::flat_map<EntityHash, EntityId> existingEntities; - LineBatch queuedLines; + LineBatch queuedLines, processingLines; bool terminated = false; Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()}; Job ingestParkedLines; Job purgeOldLogs; + Job storeQueueLines; private: template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); @@ -112,7 +117,6 @@ namespace WebStat { void logStats() const; void clearStats(); void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now); - void finishAllJobs(); LineBatch jobReadParkedLines(const std::filesystem::path &); LineBatch jobReadParkedLines(FILE *, size_t count); |
