summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp131
-rw-r--r--src/ingestor.hpp15
-rw-r--r--src/webstat_logger_main.cpp2
-rw-r--r--test/perf-ingest.cpp3
-rw-r--r--test/test-ingest.cpp51
5 files changed, 117 insertions, 85 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index e2018ad..1de9ab9 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -5,7 +5,6 @@
#include <connection.h>
#include <csignal>
#include <dbTypes.h>
-#include <fstream>
#include <modifycommand.h>
#include <ranges>
#include <scn/scan.h>
@@ -92,6 +91,7 @@ namespace WebStat {
assert(!currentIngestor);
currentIngestor = this;
signal(SIGTERM, &sigtermHandler);
+ queuedLines.reserve(settings.maxBatchSize);
}
Ingestor::~Ingestor()
@@ -171,12 +171,18 @@ namespace WebStat {
if (logIn.revents) {
if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
- ingestLogLine(line->value());
+ queuedLines.emplace_back(std::move(line->value()));
+ if (queuedLines.size() >= settings.maxBatchSize) {
+ tryIngestQueuedLogLines();
+ }
}
else {
break;
}
}
+ else {
+ tryIngestQueuedLogLines();
+ }
if (expiredThenSet(lastCheckedJobs, settings.checkJobsAfter)) {
runJobsAsNeeded();
}
@@ -184,68 +190,84 @@ namespace WebStat {
handleCurlOperations();
}
}
+ tryIngestQueuedLogLines();
+ parkQueuedLogLines();
while (!curlOperations.empty() && curl_multi_poll(curl.get(), nullptr, 0, INT_MAX, nullptr) == CURLM_OK) {
handleCurlOperations();
}
}
void
- Ingestor::ingestLogLine(const std::string_view line)
+ Ingestor::tryIngestQueuedLogLines()
{
try {
- ingestLogLine(dbpool->get().get(), line);
+ ingestLogLines(dbpool->get().get(), queuedLines);
+ queuedLines.clear();
}
catch (const std::exception &) {
- parkLogLine(line);
+ existingEntities.clear();
}
}
void
- Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line)
+ Ingestor::ingestLogLines(DB::Connection * dbconn, const LineBatch & lines)
{
- auto rememberNewEntityIds = [this](const auto & ids) {
- existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value)
- | std::views::transform([](auto && value) {
- return *value;
- }));
- };
- if (auto result = scanLogLine(line)) {
- linesParsed++;
- const auto values = crc32ScanValues(result->values());
- NewEntityIds ids;
- try {
- {
- std::optional<DB::TransactionScope> dbtx;
+ auto nonNullEntityIds = std::views::take_while(&std::optional<Crc32Value>::has_value)
+ | std::views::transform([](auto && value) {
+ return *value;
+ });
+
+ DB::TransactionScope batchTx {*dbconn};
+ for (const auto & line : lines) {
+ if (auto result = scanLogLine(line)) {
+ linesParsed++;
+ const auto values = crc32ScanValues(result->values());
+ try {
+ DB::TransactionScope dbtx {*dbconn};
if (const auto newEnts = newEntities(values); newEnts.front()) {
- dbtx.emplace(*dbconn);
- ids = storeEntities(dbconn, newEnts);
+ existingEntities.insert_range(storeEntities(dbconn, newEnts) | nonNullEntityIds);
}
storeLogLine(dbconn, values);
}
- rememberNewEntityIds(ids);
- }
- catch (const DB::Error & originalError) {
- try {
- const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {uninsertableLine}));
- }
- catch (const std::exception &) {
- throw originalError;
+ catch (const DB::Error & originalError) {
+ try {
+ DB::TransactionScope dbtx {*dbconn};
+ const auto uninsertableLine = ToEntity<EntityType::UninsertableLine> {}(line);
+ storeEntities(dbconn, {uninsertableLine});
+ }
+ catch (const std::exception &) {
+ throw originalError;
+ }
}
}
- }
- else {
- linesDiscarded++;
- const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
- rememberNewEntityIds(storeEntities(dbconn, {unparsableLine}));
+ else {
+ linesDiscarded++;
+ const auto unparsableLine = ToEntity<EntityType::UnparsableLine> {}(line);
+ storeEntities(dbconn, {unparsableLine});
+ }
}
}
void
- Ingestor::parkLogLine(std::string_view line)
+ Ingestor::parkQueuedLogLines()
{
- std::ofstream {settings.fallbackDir / std::format("parked-{}.log", crc32(line))} << line;
- linesParked++;
+ if (queuedLines.empty()) {
+ return;
+ }
+ std::string path {settings.fallbackDir / std::format("parked-{}.log", crc32(queuedLines.front()))};
+ if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
+ fprintf(parked.get(), "%zu\n", queuedLines.size());
+ for (const auto & line : queuedLines) {
+ fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
+ }
+ if (fflush(parked.get()) == 0) {
+ linesParked += queuedLines.size();
+ queuedLines.clear();
+ }
+ else {
+ std::filesystem::remove(path);
+ }
+ }
}
void
@@ -280,7 +302,7 @@ namespace WebStat {
for (auto pathIter = std::filesystem::directory_iterator {settings.fallbackDir};
pathIter != std::filesystem::directory_iterator {}; ++pathIter) {
if (scn::scan<Crc32Value>(pathIter->path().filename().string(), "parked-{}.log")) {
- jobIngestParkedLine(pathIter);
+ jobIngestParkedLines(pathIter->path());
count += 1;
}
}
@@ -288,29 +310,30 @@ namespace WebStat {
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::directory_iterator & pathIter)
+ Ingestor::jobIngestParkedLines(const std::filesystem::path & path)
{
- jobIngestParkedLine(pathIter->path(), pathIter->file_size());
+ if (auto parked = FilePtr(fopen(path.c_str(), "r"))) {
+ if (auto count = scn::scan<size_t>(parked.get(), "{}\n")) {
+ jobIngestParkedLines(parked.get(), count->value());
+ std::filesystem::remove(path);
+ return;
+ }
+ }
+ throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
void
- Ingestor::jobIngestParkedLine(const std::filesystem::path & path, uintmax_t size)
+ Ingestor::jobIngestParkedLines(FILE * lines, size_t count)
{
- if (std::ifstream parked {path}) {
- std::string line;
- line.resize_and_overwrite(size, [&parked](char * content, size_t size) {
- parked.read(content, static_cast<std::streamsize>(size));
- return static_cast<size_t>(parked.tellg());
- });
- if (line.length() < size) {
+ for (size_t line = 0; line < count; ++line) {
+ if (auto line = scn::scan<std::string>(lines, "{:[^\n]}\n")) {
+ linesRead++;
+ queuedLines.emplace_back(std::move(line->value()));
+ }
+ else {
throw std::system_error {errno, std::generic_category(), "Short read of parked file"};
}
- ingestLogLine(dbpool->get().get(), line);
- }
- else {
- throw std::system_error {errno, std::generic_category(), strerror(errno)};
}
- std::filesystem::remove(path);
}
unsigned int
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 2ae2936..94e0d5c 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -24,6 +24,7 @@ namespace WebStat {
std::filesystem::path fallbackDir = "/var/log/webstat";
unsigned int dbMax = 4;
unsigned int dbKeep = 2;
+ size_t maxBatchSize = 1;
minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
minutes freqPurgeOldLogs = 6h;
@@ -36,6 +37,7 @@ namespace WebStat {
class Ingestor {
public:
+ using LineBatch = std::vector<std::string>;
Ingestor(const utsname &, IngestorSettings);
Ingestor(const utsname &, DB::ConnectionPoolPtr, IngestorSettings);
@@ -50,9 +52,9 @@ namespace WebStat {
[[nodiscard]] static ScanResult scanLogLine(std::string_view);
void ingestLog(std::FILE *);
- void ingestLogLine(std::string_view);
- void ingestLogLine(DB::Connection *, std::string_view);
- void parkLogLine(std::string_view);
+ void tryIngestQueuedLogLines();
+ void ingestLogLines(DB::Connection *, const LineBatch & lines);
+ void parkQueuedLogLines();
void runJobsAsNeeded();
unsigned int jobIngestParkedLines();
@@ -70,7 +72,8 @@ namespace WebStat {
size_t linesParsed = 0;
size_t linesDiscarded = 0;
size_t linesParked = 0;
- mutable std::flat_set<Crc32Value> existingEntities;
+ std::flat_set<Crc32Value> existingEntities;
+ LineBatch queuedLines;
bool terminated = false;
@@ -98,8 +101,8 @@ namespace WebStat {
void onNewUserAgent(const Entity &) const;
void handleCurlOperations();
- void jobIngestParkedLine(const std::filesystem::directory_iterator &);
- void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size);
+ void jobIngestParkedLines(const std::filesystem::path &);
+ void jobIngestParkedLines(FILE *, size_t count);
static void sigtermHandler(int);
void terminate(int);
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 6d3aeda..39d794a 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -53,6 +53,8 @@ main(int argc, char ** argv)
"Maximum number of concurrent write/read write DB connections")
("db.wr.keep", po::value(&settings.dbKeep)->default_value(settings.dbKeep),
"Number of write/read write DB connections to keep open")
+ ("db.maxBatchSize", po::value(&settings.maxBatchSize)->default_value(settings.maxBatchSize),
+ "Maximum number of access log entries to hold in memory before writing them to the DB")
("fallback.dir", po::value(&settings.fallbackDir)->default_value(settings.fallbackDir),
"Path to write access logs to when the database is unavailable")
("jobs.check", po::value(&settings.checkJobsAfter)->default_value(settings.checkJobsAfter),
diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp
index 69212de..c403349 100644
--- a/test/perf-ingest.cpp
+++ b/test/perf-ingest.cpp
@@ -25,6 +25,7 @@ namespace {
std::make_shared<WebStat::MockDBPool>("webstat"),
{
.userAgentAPI = {},
+ .maxBatchSize = static_cast<size_t>(state.range(0)),
}};
for (auto loop : state) {
WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")};
@@ -33,6 +34,6 @@ namespace {
}
}
-BENCHMARK(doIngestFile)->Setup(setup);
+BENCHMARK_RANGE(doIngestFile, 1, 1024)->Setup(setup);
BENCHMARK_MAIN();
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index be3be56..a1dc5e9 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -230,11 +230,10 @@ BOOST_DATA_TEST_CASE(StoreLogLine,
}),
line)
{
- ingestLogLine(DB::MockDatabase::openConnectionTo("webstat").get(), line);
+ ingestLogLines(DB::MockDatabase::openConnectionTo("webstat").get(), {std::string {line}});
BOOST_CHECK_EQUAL(linesRead, 0);
BOOST_CHECK_EQUAL(linesParsed, 1);
BOOST_CHECK_EQUAL(linesDiscarded, 0);
- BOOST_CHECK_EQUAL(linesParked, 0);
BOOST_CHECK_EQUAL(existingEntities.size(), 5);
}
@@ -264,34 +263,33 @@ BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5))
BOOST_AUTO_TEST_CASE(ParkLogLine)
{
- parkLogLine(LOGLINE1);
- BOOST_CHECK_EQUAL(linesParked, 1);
+ queuedLines.emplace_back(LOGLINE1);
+ queuedLines.emplace_back(LOGLINE2);
+ parkQueuedLogLines();
const auto path = settings.fallbackDir / LOGLINE1_PARKED;
BOOST_TEST_INFO(path);
BOOST_REQUIRE(std::filesystem::exists(path));
- BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length());
+ BOOST_CHECK_EQUAL(std::filesystem::file_size(path), LOGLINE1.length() + LOGLINE2.length() + 4);
}
-BOOST_TEST_DECORATOR(*boost::unit_test::depends_on("I/ParkLogLine"))
-
-BOOST_AUTO_TEST_CASE(ParkLogLineOnError)
+BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine"))
{
BOOST_REQUIRE(existingEntities.empty());
constexpr std::string_view LOGLINE_BAD_VERB
= R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG";
- BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE_BAD_VERB));
+ BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}}));
BOOST_CHECK_EQUAL(linesParked, 0);
- BOOST_CHECK_EQUAL(existingEntities.size(), 1);
+ BOOST_CHECK_EQUAL(linesDiscarded, 1);
}
BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine"))
{
- parkLogLine(LOGLINE1);
- BOOST_REQUIRE_EQUAL(linesParked, 1);
- BOOST_REQUIRE_EQUAL(linesParsed, 0);
+ queuedLines.emplace_back(LOGLINE1);
+ queuedLines.emplace_back(LOGLINE2);
+ parkQueuedLogLines();
+ BOOST_REQUIRE(queuedLines.empty());
jobIngestParkedLines();
- BOOST_CHECK_EQUAL(linesParsed, 1);
- BOOST_CHECK_EQUAL(linesDiscarded, 0);
+ BOOST_CHECK_EQUAL(queuedLines.size(), 2);
BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
}
@@ -307,18 +305,22 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
{
const auto now = Job::LastRunTime::clock::now();
ingestParkedLines.lastRun = now - 1s;
- parkLogLine(LOGLINE1);
+ queuedLines.emplace_back(LOGLINE1);
+ parkQueuedLogLines();
+ BOOST_REQUIRE(queuedLines.empty());
+ BOOST_REQUIRE(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
- BOOST_REQUIRE_EQUAL(linesParked, 1);
- BOOST_REQUIRE_EQUAL(linesParsed, 0);
+ BOOST_CHECK(queuedLines.empty());
+ BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - 1s);
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines + 2s;
+ runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
- BOOST_REQUIRE_EQUAL(linesParked, 1);
- BOOST_REQUIRE_EQUAL(linesParsed, 0);
+ BOOST_CHECK(queuedLines.empty());
+ BOOST_CHECK(std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
BOOST_CHECK_EQUAL(ingestParkedLines.lastRun, now - settings.freqIngestParkedLines + 2s);
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
@@ -327,8 +329,7 @@ BOOST_AUTO_TEST_CASE(IngestParkedJob,
ingestParkedLines.currentRun->wait();
runJobsAsNeeded();
BOOST_REQUIRE(!ingestParkedLines.currentRun);
- BOOST_CHECK_EQUAL(linesParsed, 1);
- BOOST_CHECK_EQUAL(linesDiscarded, 0);
+ BOOST_CHECK_EQUAL(queuedLines.size(), 1);
BOOST_CHECK_GE(ingestParkedLines.lastRun, now);
BOOST_CHECK(!std::filesystem::exists(settings.fallbackDir / LOGLINE1_PARKED));
}
@@ -337,7 +338,8 @@ BOOST_AUTO_TEST_CASE(JobErrorRescheduler, *boost::unit_test::depends_on("I/Inges
{
const auto now = Job::LastRunTime::clock::now();
ingestParkedLines.lastRun = now - settings.freqIngestParkedLines - 1s;
- parkLogLine(LOGLINE1);
+ queuedLines.emplace_back(LOGLINE1);
+ parkQueuedLogLines();
std::filesystem::permissions(settings.fallbackDir / LOGLINE1_PARKED, std::filesystem::perms::owner_write);
runJobsAsNeeded();
BOOST_REQUIRE(ingestParkedLines.currentRun);
@@ -366,7 +368,8 @@ BOOST_AUTO_TEST_CASE(FetchMockUserAgentDetail)
BOOST_AUTO_TEST_CASE(DiscardUnparsable)
{
- BOOST_REQUIRE_NO_THROW(ingestLogLine("does not parse"));
+ queuedLines.emplace_back("does not parse");
+ BOOST_REQUIRE_NO_THROW(tryIngestQueuedLogLines());
auto dbconn = dbpool->get();
auto select = dbconn->select("SELECT id::bigint, value FROM entities WHERE type = 'unparsable_line'");
constexpr std::array<std::tuple<Crc32Value, std::string_view>, 1> EXPECTED {{