summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-01 16:58:01 +0100
committerDan Goodliffe <dan.goodliffe@octal.co.uk>2026-05-01 17:01:56 +0100
commit0e768a00fab74818533458b5b40c84556b2137e0 (patch)
tree3a405ae5ecbd71727b77e4a7892ad0b2815dddc4 /src
parented3598ac6f77f62b0c4fe32d4dab05784e5fa51d (diff)
downloadwebstat-0e768a00fab74818533458b5b40c84556b2137e0.tar.bz2
webstat-0e768a00fab74818533458b5b40c84556b2137e0.tar.xz
webstat-0e768a00fab74818533458b5b40c84556b2137e0.zip
Append unparked lines to queue in finalise function
Fixes issue where queuedLines would be accessed from background thread.
Diffstat (limited to 'src')
-rw-r--r--src/ingestor.cpp31
-rw-r--r--src/ingestor.hpp4
2 files changed, 19 insertions, 16 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 9159a12..7fd64cd 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -412,37 +412,41 @@ namespace WebStat {
Ingestor::Job::Result
Ingestor::jobIngestParkedLines()
{
- unsigned int count = 0;
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
if (scn::scan<std::string>(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) {
- jobIngestParkedLines(pathIter->path());
- count += 1;
+ return [lines = jobIngestParkedLines(pathIter->path()), this, path = pathIter->path()]() mutable {
+ auto count = lines.size();
+ queuedLines.append_range(std::move(lines));
+ unlink(path.c_str());
+ return count;
+ };
}
}
- return [count]() {
- return count;
+ return []() {
+ return 0;
};
}
- void
+ Ingestor::LineBatch
Ingestor::jobIngestParkedLines(const std::filesystem::path & path)
{
if (auto parked = FilePtr(fopen(path.c_str(), "r"))) {
if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) {
- if (jobIngestParkedLines(parked.get(), count->value()) < count->value()) {
+ try {
+ return jobIngestParkedLines(parked.get(), count->value());
+ }
+ catch (...) {
auto failPath = auto {path}.replace_extension(".short");
rename(path.c_str(), failPath.c_str());
- throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
+ throw;
}
- unlink(path.c_str());
- return;
}
}
throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
- size_t
+ Ingestor::LineBatch
Ingestor::jobIngestParkedLines(FILE * lines, size_t count)
{
LineBatch parkedLines;
@@ -453,11 +457,10 @@ namespace WebStat {
parkedLines.emplace_back(std::move(line->value()));
}
else {
- return lineNo;
+ throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
}
}
- queuedLines.append_range(std::move(parkedLines));
- return count;
+ return parkedLines;
}
Ingestor::Job::Result
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 4882b7d..4373f40 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -114,8 +114,8 @@ namespace WebStat {
void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now);
void finishAllJobs();
- void jobIngestParkedLines(const std::filesystem::path &);
- size_t jobIngestParkedLines(FILE *, size_t count);
+ LineBatch jobIngestParkedLines(const std::filesystem::path &);
+ LineBatch jobIngestParkedLines(FILE *, size_t count);
static void sigtermHandler(int);
void terminate(int);