summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ingestor.cpp60
-rw-r--r--src/ingestor.hpp36
2 files changed, 61 insertions, 35 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index e2c315c..4af2f2d 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -117,6 +117,7 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}, mainThread {std::this_thread::get_id()}
@@ -225,7 +226,7 @@ namespace WebStat {
stats.linesRead++;
queuedLines.emplace_back(std::move(line->value()));
if (queuedLines.size() >= settings.maxBatchSize) {
- tryIngestQueuedLogLines();
+ beginIngestQueuedLogLines();
}
}
else {
@@ -233,7 +234,7 @@ namespace WebStat {
}
}
else {
- tryIngestQueuedLogLines();
+ beginIngestQueuedLogLines();
}
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
@@ -243,21 +244,37 @@ namespace WebStat {
}
}
finishAllJobs();
- tryIngestQueuedLogLines();
- std::ignore = parkQueuedLogLines();
+ std::invoke(storeQueueLines.impl, this)();
+ std::ignore = parkLogLines(queuedLines);
+ std::ignore = parkLogLines(processingLines);
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
logStats();
}
- void
- Ingestor::tryIngestQueuedLogLines()
+ std::pair<std::future<Ingestor::Job::Result> &, bool>
+ Ingestor::beginIngestQueuedLogLines()
+ {
+ if (storeQueueLines.currentRun) {
+ if (!storeQueueLines.currentRun->valid()) {
+ return {*storeQueueLines.currentRun, false};
+ }
+ finalizeJob(storeQueueLines, {}, Job::LastRunTime::clock::now());
+ }
+ if (processingLines.empty()) {
+ std::swap(queuedLines, processingLines);
+ }
+ return {storeQueueLines.currentRun.emplace(std::async(storeQueueLines.impl, this)), true};
+ }
+
+ Ingestor::Job::Result
+ Ingestor::jobStoreQueuedLines()
{
- auto storedEnd = queuedLines.begin();
+ auto storedEnd = processingLines.begin();
try {
- for (auto batch :
- queuedLines | std::views::chunk(settings.maxBatchSize) | std::views::take(settings.maxBatches)) {
+ for (auto batch : processingLines | std::views::chunk(settings.maxBatchSize)
+ | std::views::take(settings.maxBatches)) {
ingestLogLines(dbpool->get().get(), batch);
storedEnd = batch.end();
}
@@ -266,7 +283,11 @@ namespace WebStat {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
existingEntities.clear();
}
- queuedLines.erase(queuedLines.begin(), storedEnd);
+ auto count = std::distance(processingLines.begin(), storedEnd);
+ processingLines.erase(processingLines.begin(), storedEnd);
+ return [count]() {
+ return count;
+ };
}
template<typename... T>
@@ -335,20 +356,20 @@ namespace WebStat {
}
std::expected<std::filesystem::path, int>
- Ingestor::parkQueuedLogLines()
+ Ingestor::parkLogLines(LineBatch & lines)
{
- if (queuedLines.empty()) {
+ if (lines.empty()) {
return std::unexpected(0);
}
- const std::filesystem::path path {settings.fallbackDir
- / std::format("parked-{:s}.short", bytesToHexRange(makeHash(queuedLines.front())))};
+ const std::filesystem::path path {
+ settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))};
if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
- fprintf(parked.get(), "%zu\n", queuedLines.size());
- for (const auto & line : queuedLines) {
+ fprintf(parked.get(), "%zu\n", lines.size());
+ for (const auto & line : lines) {
fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
}
if (fflush(parked.get()) == 0) {
- queuedLines.clear();
+ lines.clear();
auto finalPath = std::filesystem::path {path}.replace_extension(".log");
parked.reset();
if (rename(path.c_str(), finalPath.c_str()) == 0) {
@@ -357,8 +378,8 @@ namespace WebStat {
}
}
const int err = errno;
- log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size());
- for (const auto & line : queuedLines) {
+ log(LOG_ERR, "Failed to park %zu queued lines:", lines.size());
+ for (const auto & line : lines) {
log(LOG_ERR, "\t%.*s", static_cast<int>(line.length()), line.data());
}
return std::unexpected(err);
@@ -407,6 +428,7 @@ namespace WebStat {
};
finishJob(ingestParkedLines);
finishJob(purgeOldLogs);
+ finishJob(storeQueueLines);
}
Ingestor::Job::Result
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index d989ecd..64b3357 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -41,6 +41,19 @@ namespace WebStat {
public:
using LineBatch = std::vector<std::string>;
using LinesView = std::span<const std::string>;
+
+ struct Job {
+ using LastRunTime = std::chrono::system_clock::time_point;
+ using Result = std::function<unsigned int()>;
+ using Impl = Result (Ingestor::*)();
+
+ explicit Job(Impl jobImpl) : impl(jobImpl) { }
+
+ const Impl impl;
+ LastRunTime lastRun {LastRunTime::clock::now()};
+ std::optional<std::future<Result>> currentRun;
+ };
+
Ingestor(const utsname &, IngestorSettings);
Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings);
@@ -55,25 +68,14 @@ namespace WebStat {
[[nodiscard]] static ScanResult scanLogLine(std::string_view);
void ingestLog(std::FILE *);
- void tryIngestQueuedLogLines();
+ std::pair<std::future<Job::Result> &, bool> beginIngestQueuedLogLines();
void ingestLogLines(DB::Connection *, LinesView lines);
- std::expected<std::filesystem::path, int> parkQueuedLogLines();
+ std::expected<std::filesystem::path, int> parkLogLines(LineBatch &);
void runJobsAsNeeded();
- struct Job {
- using LastRunTime = std::chrono::system_clock::time_point;
- using Result = std::function<unsigned int()>;
- using Impl = Result (Ingestor::*)();
-
- explicit Job(Impl jobImpl) : impl(jobImpl) { }
-
- const Impl impl;
- LastRunTime lastRun {LastRunTime::clock::now()};
- std::optional<std::future<Result>> currentRun;
- };
-
Job::Result jobReadParkedLines();
Job::Result jobPurgeOldLogs();
+ Job::Result jobStoreQueuedLines();
template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
@@ -89,18 +91,21 @@ namespace WebStat {
};
protected:
+ void finishAllJobs();
+
static Ingestor * currentIngestor;
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
std::flat_map<EntityHash, EntityId> existingEntities;
- LineBatch queuedLines;
+ LineBatch queuedLines, processingLines;
bool terminated = false;
Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()};
Job ingestParkedLines;
Job purgeOldLogs;
+ Job storeQueueLines;
private:
template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
@@ -112,7 +117,6 @@ namespace WebStat {
void logStats() const;
void clearStats();
void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now);
- void finishAllJobs();
LineBatch jobReadParkedLines(const std::filesystem::path &);
LineBatch jobReadParkedLines(FILE *, size_t count);