summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp27
1 files changed, 19 insertions, 8 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index eb3be09..1b670fb 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -262,7 +262,8 @@ namespace WebStat {
if (queuedLines.empty()) {
return;
}
- std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))};
+ const std::filesystem::path path {
+ settings.fallbackDir / std::format("parked-{}.short", crc32(queuedLines.front()))};
if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
fprintf(parked.get(), "%zu\n", queuedLines.size());
for (const auto & line : queuedLines) {
@@ -271,9 +272,11 @@ namespace WebStat {
if (fflush(parked.get()) == 0) {
linesParked += queuedLines.size();
queuedLines.clear();
- return;
+ auto finalPath = auto {path}.replace_extension(".log");
+ if (rename(path.c_str(), finalPath.c_str()) == 0) {
+ return;
+ }
}
- std::filesystem::remove(path);
}
log(LOG_ERR, "Failed to park %zu queued lines:", queuedLines.size());
for (const auto & line : queuedLines) {
@@ -326,26 +329,34 @@ namespace WebStat {
{
if (auto parked = FilePtr(fopen(path.c_str(), "r"))) {
if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) {
- jobIngestParkedLines(parked.get(), count->value());
- std::filesystem::remove(path);
+ if (jobIngestParkedLines(parked.get(), count->value()) < count->value()) {
+ auto failPath = auto {path}.replace_extension(".short");
+ rename(path.c_str(), failPath.c_str());
+ throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
+ }
+ unlink(path.c_str());
return;
}
}
throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
- void
+ size_t
Ingestor::jobIngestParkedLines(FILE * lines, size_t count)
{
+ LineBatch parkedLines;
+ parkedLines.reserve(count);
for (size_t lineNo = 0; lineNo < count; ++lineNo) {
if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
linesRead++;
- queuedLines.emplace_back(std::move(line->value()));
+ parkedLines.emplace_back(std::move(line->value()));
}
else {
- throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
+ return lineNo;
}
}
+ queuedLines.append_range(std::move(parkedLines));
+ return count;
}
unsigned int