diff options
| author | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-01 16:58:01 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan.goodliffe@octal.co.uk> | 2026-05-01 17:01:56 +0100 |
| commit | 0e768a00fab74818533458b5b40c84556b2137e0 (patch) | |
| tree | 3a405ae5ecbd71727b77e4a7892ad0b2815dddc4 /src | |
| parent | ed3598ac6f77f62b0c4fe32d4dab05784e5fa51d (diff) | |
| download | webstat-0e768a00fab74818533458b5b40c84556b2137e0.tar.bz2 webstat-0e768a00fab74818533458b5b40c84556b2137e0.tar.xz webstat-0e768a00fab74818533458b5b40c84556b2137e0.zip | |
Append unparked lines to queue in finalise function
Fixes issue where queuedLines would be accessed from background thread.
Diffstat (limited to 'src')
| -rw-r--r-- | src/ingestor.cpp | 31 | ||||
| -rw-r--r-- | src/ingestor.hpp | 4 |
2 files changed, 19 insertions, 16 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9159a12..7fd64cd 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -412,37 +412,41 @@ namespace WebStat { Ingestor::Job::Result Ingestor::jobIngestParkedLines() { - unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { - jobIngestParkedLines(pathIter->path()); - count += 1; + return [lines = jobIngestParkedLines(pathIter->path()), this, path = pathIter->path()]() mutable { + auto count = lines.size(); + queuedLines.append_range(std::move(lines)); + unlink(path.c_str()); + return count; + }; } } - return [count]() { - return count; + return []() { + return 0; }; } - void + Ingestor::LineBatch Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) { - if (jobIngestParkedLines(parked.get(), count->value()) < count->value()) { + try { + return jobIngestParkedLines(parked.get(), count->value()); + } + catch (...) { auto failPath = auto {path}.replace_extension(".short"); rename(path.c_str(), failPath.c_str()); - throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; + throw; } - unlink(path.c_str()); - return; } } throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - size_t + Ingestor::LineBatch Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { LineBatch parkedLines; @@ -453,11 +457,10 @@ namespace WebStat { parkedLines.emplace_back(std::move(line->value())); } else { - return lineNo; + throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } } - queuedLines.append_range(std::move(parkedLines)); - return count; + return parkedLines; } Ingestor::Job::Result diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 4882b7d..4373f40 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -114,8 +114,8 @@ namespace WebStat { void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now); void finishAllJobs(); - void jobIngestParkedLines(const std::filesystem::path &); - size_t jobIngestParkedLines(FILE *, size_t count); + LineBatch jobIngestParkedLines(const std::filesystem::path &); + LineBatch jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); |
