diff options
Diffstat (limited to 'src/ingestor.cpp')
| -rw-r--r-- | src/ingestor.cpp | 65 |
1 files changed, 47 insertions, 18 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 5f9fa60..5d7243a 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -132,6 +132,7 @@ namespace WebStat { Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl, IngestorSettings givenSettings) : settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, + handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, storeQueueLines {&Ingestor::jobStoreQueuedLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, @@ -204,29 +205,51 @@ namespace WebStat { >(input, R"({} {} {} {:[A-Z]} {} {} {} {} {} {} {} {} {})"); } + auto + Ingestor::withCurlLock(auto &&... operation) + { + std::lock_guard curlOperationsLock {curlOperationsMutex}; + return std::invoke(std::forward<decltype(operation)>(operation)...); + } + + bool + Ingestor::haveCurlOperations() + { + return withCurlLock([this]() { + return !curlOperations.empty(); + }); + } + void Ingestor::handleCurlOperations() { + if (!curlOperations.empty()) { + curl_multi_perform(curl.get(), nullptr); + } + } + + Ingestor::Job::Result + Ingestor::jobHandleCompleteCurlOps() + { int remaining {}; - curl_multi_perform(curl.get(), nullptr); - while (auto msg = curl_multi_info_read(curl.get(), &remaining)) { + while (auto msg = withCurlLock(curl_multi_info_read, curl.get(), &remaining)) { if (msg->msg == CURLMSG_DONE) { - if (auto operationItr = curlOperations.find(msg->easy_handle); operationItr != curlOperations.end()) { - if (msg->data.result == CURLE_OK) { - operationItr->second->whenComplete(dbpool->get().get()); - } - else { - operationItr->second->onError(dbpool->get().get()); - } - curl_multi_remove_handle(curl.get(), msg->easy_handle); - curlOperations.erase(operationItr); + const auto result = msg->data.result; + if (auto operation = withCurlLock([this, handle = msg->easy_handle]() { + curl_multi_remove_handle(curl.get(), handle); + return curlOperations.extract(handle); + })) { + std::invoke((result == CURLE_OK) ? &CurlOperation::whenComplete : &CurlOperation::onError, + operation.mapped(), dbpool->get().get()); } else { - curlOperations.erase(msg->easy_handle); log(LOG_WARNING, "Failed to lookup CurlOperation"); } } } + return []() { + return 0; + }; } void @@ -255,17 +278,21 @@ namespace WebStat { if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) { runJobsAsNeeded(); } - if (std::lock_guard curlOperationsLock {curlOperationsMutex}; !curlOperations.empty()) { - handleCurlOperations(); - } + withCurlLock(&Ingestor::handleCurlOperations, this); } finishAllJobs(); std::invoke(storeQueueLines.impl, this)(); std::ignore = parkLogLines(queuedLines); std::ignore = parkLogLines(processingLines); - while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) { - handleCurlOperations(); - } + withCurlLock([this]() { + if (!curlOperations.empty()) { + int running = -1; + while (running && curl_multi_poll(curl.get(), nullptr, 0, 500, &running) == CURLM_OK) { + curl_multi_perform(curl.get(), nullptr); + } + } + }); + jobHandleCompleteCurlOps(); logStats(); } @@ -431,6 +458,7 @@ namespace WebStat { } } }; + runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); } @@ -447,6 +475,7 @@ namespace WebStat { finishJob(ingestParkedLines); finishJob(purgeOldLogs); finishJob(storeQueueLines); + finishJob(handleCompleteCurlOps); } Ingestor::Job::Result |
