From 0e768a00fab74818533458b5b40c84556b2137e0 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 1 May 2026 16:58:01 +0100 Subject: Append unparked lines to queue in finalise function Fixes issue where queuedLines would be accessed from background thread. --- src/ingestor.cpp | 31 +++++++++++++++++-------------- src/ingestor.hpp | 4 ++-- test/test-ingest.cpp | 2 +- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9159a12..7fd64cd 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -412,37 +412,41 @@ namespace WebStat { Ingestor::Job::Result Ingestor::jobIngestParkedLines() { - unsigned int count = 0; for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir}; pathIter != std::filesystem::directory_iterator {}; ++pathIter) { if (scn::scan(pathIter->path().filename().string(), "parked-{:[a-zA-Z0-9]}.log")) { - jobIngestParkedLines(pathIter->path()); - count += 1; + return [lines = jobIngestParkedLines(pathIter->path()), this, path = pathIter->path()]() mutable { + auto count = lines.size(); + queuedLines.append_range(std::move(lines)); + unlink(path.c_str()); + return count; + }; } } - return [count]() { - return count; + return []() { + return 0; }; } - void + Ingestor::LineBatch Ingestor::jobIngestParkedLines(const std::filesystem::path & path) { if (auto parked = FilePtr(fopen(path.c_str(), "r"))) { if (auto count = scn::scan(parked.get(), "{}\n")) { - if (jobIngestParkedLines(parked.get(), count->value()) < count->value()) { + try { + return jobIngestParkedLines(parked.get(), count->value()); + } + catch (...) { auto failPath = auto {path}.replace_extension(".short"); rename(path.c_str(), failPath.c_str()); - throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; + throw; } - unlink(path.c_str()); - return; } } throw std::system_error {errno, std::generic_category(), strerror(errno)}; } - size_t + Ingestor::LineBatch Ingestor::jobIngestParkedLines(FILE * lines, size_t count) { LineBatch parkedLines; @@ -453,11 +457,10 @@ namespace WebStat { parkedLines.emplace_back(std::move(line->value())); } else { - return lineNo; + throw std::system_error {errno, std::generic_category(), "Short read of parked file"}; } } - queuedLines.append_range(std::move(parkedLines)); - return count; + return parkedLines; } Ingestor::Job::Result diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 4882b7d..4373f40 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -114,8 +114,8 @@ namespace WebStat { void finalizeJob(Job &, minutes freq, Job::LastRunTime::clock::time_point now); void finishAllJobs(); - void jobIngestParkedLines(const std::filesystem::path &); - size_t jobIngestParkedLines(FILE *, size_t count); + LineBatch jobIngestParkedLines(const std::filesystem::path &); + LineBatch jobIngestParkedLines(FILE *, size_t count); static void sigtermHandler(int); void terminate(int); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 4f9c7ab..3b53529 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -322,7 +322,7 @@ BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine" BOOST_REQUIRE(queuedLines.empty()); const auto result = jobIngestParkedLines(); BOOST_REQUIRE(result); - BOOST_CHECK_EQUAL(result(), 1); + BOOST_CHECK_EQUAL(result(), 2); BOOST_CHECK_EQUAL(queuedLines.size(), 2); BOOST_CHECK(std::filesystem::is_empty(settings.fallbackDir)); } -- cgit v1.3