summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-07 15:41:19 +0100
committerDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-07 15:41:19 +0100
commit157614349f5f4c2387e0a970a4ca3ddaeaa23446 (patch)
tree4cd4dcd1a09638d692527d70c359e5c225745304
parentcbb2035fd33ba84fea56c7a7223c563b925e8649 (diff)
downloadwebstat-157614349f5f4c2387e0a970a4ca3ddaeaa23446.tar.bz2
webstat-157614349f5f4c2387e0a970a4ca3ddaeaa23446.tar.xz
webstat-157614349f5f4c2387e0a970a4ca3ddaeaa23446.zip
Handle completed curl operations in a job
Removes the need to block the main thread from reading stdin while performing post curl operation actions, such as updating user agent details.
-rw-r--r--src/ingestor.cpp65
-rw-r--r--src/ingestor.hpp4
2 files changed, 51 insertions, 18 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 5f9fa60..5d7243a 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -132,6 +132,7 @@ namespace WebStat {
Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) :
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
+ handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
storeQueueLines {&Ingestor::jobStoreQueuedLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
@@ -204,29 +205,51 @@ namespace WebStat {
>(input, R"({} {} {} {:[A-Z]} {} {} {} {} {} {} {} {} {})");
}
+ auto
+ Ingestor::withCurlLock(auto &&... operation)
+ {
+ std::lock_guard curlOperationsLock {curlOperationsMutex};
+ return std::invoke(std::forward<decltype(operation)>(operation)...);
+ }
+
+ bool
+ Ingestor::haveCurlOperations()
+ {
+ return withCurlLock([this]() {
+ return !curlOperations.empty();
+ });
+ }
+
void
Ingestor::handleCurlOperations()
{
+ if (!curlOperations.empty()) {
+ curl_multi_perform(curl.get(), nullptr);
+ }
+ }
+
+ Ingestor::Job::Result
+ Ingestor::jobHandleCompleteCurlOps()
+ {
int remaining {};
- curl_multi_perform(curl.get(), nullptr);
- while (auto msg = curl_multi_info_read(curl.get(), &remaining)) {
+ while (auto msg = withCurlLock(curl_multi_info_read, curl.get(), &remaining)) {
if (msg->msg == CURLMSG_DONE) {
- if (auto operationItr = curlOperations.find(msg->easy_handle); operationItr != curlOperations.end()) {
- if (msg->data.result == CURLE_OK) {
- operationItr->second->whenComplete(dbpool->get().get());
- }
- else {
- operationItr->second->onError(dbpool->get().get());
- }
- curl_multi_remove_handle(curl.get(), msg->easy_handle);
- curlOperations.erase(operationItr);
+ const auto result = msg->data.result;
+ if (auto operation = withCurlLock([this, handle = msg->easy_handle]() {
+ curl_multi_remove_handle(curl.get(), handle);
+ return curlOperations.extract(handle);
+ })) {
+ std::invoke((result == CURLE_OK) ? &CurlOperation::whenComplete : &CurlOperation::onError,
+ operation.mapped(), dbpool->get().get());
}
else {
- curlOperations.erase(msg->easy_handle);
log(LOG_WARNING, "Failed to lookup CurlOperation");
}
}
}
+ return []() {
+ return 0;
+ };
}
void
@@ -255,17 +278,21 @@ namespace WebStat {
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
}
- if (std::lock_guard curlOperationsLock {curlOperationsMutex}; !curlOperations.empty()) {
- handleCurlOperations();
- }
+ withCurlLock(&Ingestor::handleCurlOperations, this);
}
finishAllJobs();
std::invoke(storeQueueLines.impl, this)();
std::ignore = parkLogLines(queuedLines);
std::ignore = parkLogLines(processingLines);
- while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
- handleCurlOperations();
- }
+ withCurlLock([this]() {
+ if (!curlOperations.empty()) {
+ int running = -1;
+ while (running && curl_multi_poll(curl.get(), nullptr, 0, 500, &running) == CURLM_OK) {
+ curl_multi_perform(curl.get(), nullptr);
+ }
+ }
+ });
+ jobHandleCompleteCurlOps();
logStats();
}
@@ -431,6 +458,7 @@ namespace WebStat {
}
}
};
+ runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1});
runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
}
@@ -447,6 +475,7 @@ namespace WebStat {
finishJob(ingestParkedLines);
finishJob(purgeOldLogs);
finishJob(storeQueueLines);
+ finishJob(handleCompleteCurlOps);
}
Ingestor::Job::Result
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index a51c46c..1125692 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -75,6 +75,7 @@ namespace WebStat {
std::expected<std::filesystem::path, int> parkLogLines(LineBatch &);
void runJobsAsNeeded();
+ Job::Result jobHandleCompleteCurlOps();
Job::Result jobReadParkedLines();
Job::Result jobPurgeOldLogs();
Job::Result jobStoreQueuedLines();
@@ -105,6 +106,7 @@ namespace WebStat {
bool terminated = false;
Job::LastRunTime lastCheckedJobs {Job::LastRunTime::clock::now()};
+ Job handleCompleteCurlOps;
Job ingestParkedLines;
Job purgeOldLogs;
Job storeQueueLines;
@@ -115,6 +117,8 @@ namespace WebStat {
void storeNewEntities(DB::Connection *, std::span<Entity *>) const;
void storeNewEntity(DB::Connection *, Entity &) const;
void onNewUserAgent(const Entity &) const;
+ auto withCurlLock(auto &&...);
+ bool haveCurlOperations();
void handleCurlOperations();
void logStats() const;
void clearStats();