summaryrefslogtreecommitdiff
path: root/src/ingestor.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.hpp')
-rw-r--r--src/ingestor.hpp36
1 files changed, 20 insertions, 16 deletions
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index d989ecd..64b3357 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -41,6 +41,19 @@ namespace WebStat {
public:
using LineBatch = std::vector<std::string>;
using LinesView = std::span<const std::string>;
+
+ struct Job {
+ using LastRunTime = std::chrono::system_clock::time_point;
+ using Result = std::function<unsigned int()>;
+ using Impl = Result (Ingestor::*)();
+
+ explicit Job(Impl jobImpl) : impl(jobImpl) { }
+
+ const Impl impl;
+ LastRunTime lastRun {LastRunTime::clock::now()};
+ std::optional<std::future<Result>> currentRun;
+ };
+
Ingestor(const utsname &, IngestorSettings);
Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings);
@@ -55,25 +68,14 @@ namespace WebStat {
[[nodiscard]] static ScanResult scanLogLine(std::string_view);
void ingestLog(std::FILE *);
- void tryIngestQueuedLogLines();
+ std::pair<std::future<Job::Result> &, bool> beginIngestQueuedLogLines();
void ingestLogLines(DB::Connection *, LinesView lines);
- std::expected<std::filesystem::path, int> parkQueuedLogLines();
+ std::expected<std::filesystem::path, int> parkLogLines(LineBatch &);
void runJobsAsNeeded();
- struct Job {
- using LastRunTime = std::chrono::system_clock::time_point;
- using Result = std::function<unsigned int()>;
- using Impl = Result (Ingestor::*)();
-
- explicit Job(Impl jobImpl) : impl(jobImpl) { }
-
- const Impl impl;
- LastRunTime lastRun {LastRunTime::clock::now()};
- std::optional<std::future<Result>> currentRun;
- };
-
Job::Result jobReadParkedLines();
Job::Result jobPurgeOldLogs();
+ Job::Result jobStoreQueuedLines();
template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
@@ -89,18 +91,21 @@ namespace WebStat {
};
protected:
+ void finishAllJobs();
+
static Ingestor * currentIngestor;
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
std::flat_map<EntityHash, EntityId> existingEntities;
- LineBatch queuedLines;
+ LineBatch queuedLines, processingLines;
bool terminated = false;
Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()};
Job ingestParkedLines;
Job purgeOldLogs;
+ Job storeQueueLines;
private:
template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
@@ -112,7 +117,6 @@ namespace WebStat {
void logStats() const;
void clearStats();
void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now);
- void finishAllJobs();
LineBatch jobReadParkedLines(const std::filesystem::path &);
LineBatch jobReadParkedLines(FILE *, size_t count);