summaryrefslogtreecommitdiff
path: root/src/ingestor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ingestor.cpp')
-rw-r--r--src/ingestor.cpp131
1 files changed, 77 insertions, 54 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index e2018ad..1de9ab9 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -5,7 +5,6 @@
#include <connection.h>
#include <csignal>
#include <dbTypes.h>
-#include <fstream>
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
@@ -92,6 +91,7 @@ namespace WebStat {
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
+ queuedLines.reserve(settings.maxBatchSize);
}
Ingestor::~Ingestor()
@@ -171,12 +171,18 @@ namespace WebStat {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
- ingestLogLine(line->value());
+ queuedLines.emplace_back(std::move(line->value()));
+ if (queuedLines.size() >= settings.maxBatchSize) {
+ tryIngestQueuedLogLines();
+ }
}
else {
break;
}
}
+ else {
+ tryIngestQueuedLogLines();
+ }
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
}
@@ -184,68 +190,84 @@ namespace WebStat {
handleCurlOperations();
}
}
+ tryIngestQueuedLogLines();
+ parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
}
void
- Ingestor::ingestLogLine(const std::string_view line)
+ Ingestor::tryIngestQueuedLogLines()
{
try {
- ingestLogLine(dbpool->get().get(), line);
+ ingestLogLines(dbpool->get().get(), queuedLines);
+ queuedLines.clear();
}
catch (const std::exception &) {
- parkLogLine(line);
+ existingEntities.clear();
}
}
void
- Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line)
+ Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto rememberNewEntityIds = [this](const auto & ids) {
- existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- }));
- };
- if (auto result = scanLogLine(line)) {
- linesParsed++;
- const auto values = crc32ScanValues(result->values());
- NewEntityIds ids;
- try {
- {
- std::optional<DB::TransactionScope> dbtx;
+ auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
+ | std::views::transform([](auto && value) {
+ return *value;
+ });
+
+ DB::TransactionScope batchTx {*dbconn};
+ for (const auto & line : lines) {
+ if (auto result = scanLogLine(line)) {
+ linesParsed++;
+ const auto values = crc32ScanValues(result->values());
+ try {
+ DB::TransactionScope dbtx {*dbconn};
if (const auto newEnts = newEntities(values); newEnts.front()) {
- dbtx.emplace(*dbconn);
- ids = storeEntities(dbconn, newEnts);
+ existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
}
storeLogLine(dbconn, values);
}
- rememberNewEntityIds(ids);
- }
- catch (const DB::Error & originalError) {
- try {
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine}));
- }
- catch (const std::exception &) {
- throw originalError;
+ catch (const DB::Error & originalError) {
+ try {
+ DB::TransactionScope dbtx {*dbconn};
+ const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeEntities(dbconn, {uninsertableLine});
+ }
+ catch (const std::exception &) {
+ throw originalError;
+ }
}
}
- }
- else {
- linesDiscarded++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {unparsableLine}));
+ else {
+ linesDiscarded++;
+ const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeEntities(dbconn, {unparsableLine});
+ }
}
}
void
- Ingestor::parkLogLine(std::string_view line)
+ Ingestor::parkQueuedLogLines()
{
- std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line;
- linesParked++;
+ if (queuedLines.empty()) {
+ return;
+ }
+ std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))};
+ if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
+ fprintf(parked.get(), "%zu\n", queuedLines.size());
+ for (const auto & line : queuedLines) {
+ fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
+ }
+ if (fflush(parked.get()) == 0) {
+ linesParked += queuedLines.size();
+ queuedLines.clear();
+ }
+ else {
+ std::filesystem::remove(path);
+ }
+ }
}
void
@@ -280,7 +302,7 @@ namespace WebStat {
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) {
- jobIngestParkedLine(pathIter);
+ jobIngestParkedLines(pathIter->path());
count += 1;
}
}
@@ -288,29 +310,30 @@ namespace WebStat {
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter)
+ Ingestor::jobIngestParkedLines(const std::filesystem::path & path)
{
- jobIngestParkedLine(pathIter->path(), pathIter->file_size());
+ if (auto parked = FilePtr(fopen(path.c_str(), "r"))) {
+ if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) {
+ jobIngestParkedLines(parked.get(), count->value());
+ std::filesystem::remove(path);
+ return;
+ }
+ }
+ throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size)
+ Ingestor::jobIngestParkedLines(FILE * lines, size_t count)
{
- if (std::ifstream parked {path}) {
- std::string line;
- line.resize_and_overwrite(size, [&parked](char * content, size_t size) {
- parked.read(content, static_cast<std::streamsize>(size));
- return static_cast<size_t>(parked.tellg());
- });
- if (line.length() < size) {
+ for (size_t line = 0; line < count; ++line) {
+ if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
+ linesRead++;
+ queuedLines.emplace_back(std::move(line->value()));
+ }
+ else {
throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
}
- ingestLogLine(dbpool->get().get(), line);
- }
- else {
- throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
- std::filesystem::remove(path);
}
unsigned int