summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp41
-rw-r--r--src/ingestor.hpp1
-rw-r--r--src/sql.cpp2
-rw-r--r--src/sql/accessLogPurgeOld.sql2
-rw-r--r--src/sql/selectUninsertableLines.sql1
-rw-r--r--src/util.cpp11
-rw-r--r--src/util.hpp4
-rw-r--r--src/webstat_logger_main.cpp2
8 files changed, 35 insertions, 29 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 954f872..16638eb 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -5,7 +5,9 @@
#include <connection.h>
#include <csignal>
#include <dbTypes.h>
+#include <fstream>
#include <modifycommand.h>
+#include <print>
#include <ranges>
#include <scn/scan.h>
#include <selectcommand.h>
@@ -26,19 +28,13 @@ namespace DB {
namespace WebStat {
namespace {
- using ByteArrayView = std::span<const uint8_t>;
-
- auto
- bytesToHexRange(const ByteArrayView bytes)
- {
+ constexpr auto TO_HEX_RANGE = std::views::transform([](auto byte) {
constexpr auto HEXN = 16ZU;
- return bytes | std::views::transform([](auto byte) {
- return std::array {byte / HEXN, byte % HEXN};
- }) | std::views::join
- | std::views::transform([](auto nibble) {
- return "0123456789abcdef"[nibble];
- });
- }
+ return std::array {byte / HEXN, byte % HEXN};
+ }) | std::views::join
+ | std::views::transform([](auto nibble) {
+ return "0123456789abcdef"[nibble];
+ });
EntityHash
makeHash(const std::string_view value)
@@ -457,18 +453,18 @@ namespace WebStat {
if (lines.empty()) {
return std::unexpected(0);
}
- const std::filesystem::path path {
- settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))};
- if (auto parked = FilePtr(fopen(path.c_str(), "w"))) {
- fprintf(parked.get(), "%zu\n", lines.size());
+ const std::filesystem::path path
+ = settings.fallbackDir / std::format("parked-{:s}.short", makeHash(lines.front()) | TO_HEX_RANGE);
+ if (std::ofstream parked {path}; parked.good()) {
+ std::println(parked, "{}", lines.size());
for (const auto & line : lines) {
- fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data());
+ std::println(parked, "{}", line);
}
- if (fflush(parked.get()) == 0) {
+ if (parked.flush().good()) {
lines.clear();
- auto finalPath = std::filesystem::path {path}.replace_extension(".log");
- parked.reset();
- if (rename(path.c_str(), finalPath.c_str()) == 0) {
+ auto finalPath = auto {path}.replace_extension(".log");
+ parked.close();
+ if (!renameNoExcept(path, finalPath)) {
return finalPath;
}
}
@@ -590,12 +586,11 @@ namespace WebStat {
Ingestor::jobPurgeOldLogs()
{
auto dbconn = dbpool->get();
- const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime;
const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS);
purge->bindParam(0, std::format("{} days", settings.purgeDaysToKeep));
purge->bindParam(1, settings.purgeDeleteMax);
unsigned int purgedTotal {};
- while (!terminated && stopAt > Job::LastRunTime::clock::now()) {
+ while (!terminated) {
const auto purged = purge->execute();
purgedTotal += purged;
if (purged < settings.purgeDeleteMax) {
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 2050b7c..8808ed8 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -32,7 +32,6 @@ namespace WebStat {
minutes freqPurgeOldLogs = 6h;
unsigned int purgeDaysToKeep = 61; // ~2 months
unsigned int purgeDeleteMax = 10'000;
- minutes purgeDeleteMaxTime = 5min;
seconds purgeDeletePause = 3s;
// NOLINTEND(readability-magic-numbers)
};
diff --git a/src/sql.cpp b/src/sql.cpp
index a2dac02..63e946c 100644
--- a/src/sql.cpp
+++ b/src/sql.cpp
@@ -43,7 +43,7 @@ namespace WebStat::SQL {
HASH_OPTS(ENTITY_UPDATE_DETAIL);
HASH_OPTS(HOST_UPSERT);
const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS
- = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true);
+ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, false);
HASH_OPTS(DELETE_ENTITY);
HASH_OPTS(MARK_ENTITY_RETRIED);
HASH_OPTS(SET_ENTITY_TYPE);
diff --git a/src/sql/accessLogPurgeOld.sql b/src/sql/accessLogPurgeOld.sql
index 00e55b5..616b3a6 100644
--- a/src/sql/accessLogPurgeOld.sql
+++ b/src/sql/accessLogPurgeOld.sql
@@ -5,8 +5,6 @@ WITH delete_batch AS (
access_log
WHERE
request_time < CURRENT_DATE - ?::interval
- ORDER BY
- request_time
FOR UPDATE
LIMIT ?)
DELETE FROM access_log AS al USING delete_batch AS del
diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql
index 5c07791..894ab67 100644
--- a/src/sql/selectUninsertableLines.sql
+++ b/src/sql/selectUninsertableLines.sql
@@ -9,3 +9,4 @@ WHERE
ORDER BY
id
LIMIT ?
+FOR UPDATE
diff --git a/src/util.cpp b/src/util.cpp
new file mode 100644
index 0000000..cdce30e
--- /dev/null
+++ b/src/util.cpp
@@ -0,0 +1,11 @@
+#include "util.hpp"
+
+namespace WebStat {
+ std::error_code
+ renameNoExcept(const std::filesystem::path & path, const std::filesystem::path & finalPath) noexcept
+ {
+ std::error_code error;
+ std::filesystem::rename(path, finalPath, error);
+ return error;
+ }
+}
diff --git a/src/util.hpp b/src/util.hpp
index 5cac5a3..d5e2482 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -2,6 +2,7 @@
#include <chrono>
#include <command.h>
+#include <filesystem>
#include <scn/scan.h>
#include <shared_mutex>
#include <tuple>
@@ -144,4 +145,7 @@ namespace WebStat {
ValueType value;
mutable MutexType mutex;
};
+
+ std::error_code renameNoExcept(
+ const std::filesystem::path & path, const std::filesystem::path & finalPath) noexcept;
}
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 8dd9f52..963e318 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -83,8 +83,6 @@ main(int argc, char ** argv)
"How many days of access log entries to keep")
("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax),
"Maximum number of access log entries to delete in a single operation")
- ("job.purge.time", po::value(&settings.purgeDeleteMaxTime)->default_value(settings.purgeDeleteMaxTime),
- "Maximum amount of time to spending purging old access log entries before continuing to ingest")
("job.purge.pause", po::value(&settings.purgeDeletePause)->default_value(settings.purgeDeletePause),
"Time to pause for between repeated exections of a delete operation")
;