diff options
| -rw-r--r-- | src/ingestor.cpp | 41 | ||||
| -rw-r--r-- | src/ingestor.hpp | 1 | ||||
| -rw-r--r-- | src/sql.cpp | 2 | ||||
| -rw-r--r-- | src/sql/accessLogPurgeOld.sql | 2 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 1 | ||||
| -rw-r--r-- | src/util.cpp | 11 | ||||
| -rw-r--r-- | src/util.hpp | 4 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 |
8 files changed, 35 insertions, 29 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 954f872..16638eb 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -5,7 +5,9 @@ #include <connection.h> #include <csignal> #include <dbTypes.h> +#include <fstream> #include <modifycommand.h> +#include <print> #include <ranges> #include <scn/scan.h> #include <selectcommand.h> @@ -26,19 +28,13 @@ namespace DB { namespace WebStat { namespace { - using ByteArrayView = std::span<const uint8_t>; - - auto - bytesToHexRange(const ByteArrayView bytes) - { + constexpr auto TO_HEX_RANGE = std::views::transform([](auto byte) { constexpr auto HEXN = 16ZU; - return bytes | std::views::transform([](auto byte) { - return std::array {byte / HEXN, byte % HEXN}; - }) | std::views::join - | std::views::transform([](auto nibble) { - return "0123456789abcdef"[nibble]; - }); - } + return std::array {byte / HEXN, byte % HEXN}; + }) | std::views::join + | std::views::transform([](auto nibble) { + return "0123456789abcdef"[nibble]; + }); EntityHash makeHash(const std::string_view value) @@ -457,18 +453,18 @@ namespace WebStat { if (lines.empty()) { return std::unexpected(0); } - const std::filesystem::path path { - settings.fallbackDir / std::format("parked-{:s}.short", bytesToHexRange(makeHash(lines.front())))}; - if (auto parked = FilePtr(fopen(path.c_str(), "w"))) { - fprintf(parked.get(), "%zu\n", lines.size()); + const std::filesystem::path path + = settings.fallbackDir / std::format("parked-{:s}.short", makeHash(lines.front()) | TO_HEX_RANGE); + if (std::ofstream parked {path}; parked.good()) { + std::println(parked, "{}", lines.size()); for (const auto & line : lines) { - fprintf(parked.get(), "%.*s\n", static_cast<int>(line.length()), line.data()); + std::println(parked, "{}", line); } - if (fflush(parked.get()) == 0) { + if (parked.flush().good()) { lines.clear(); - auto finalPath = std::filesystem::path {path}.replace_extension(".log"); - parked.reset(); - if (rename(path.c_str(), finalPath.c_str()) == 0) { + auto finalPath = auto {path}.replace_extension(".log"); + parked.close(); + if (!renameNoExcept(path, finalPath)) { return finalPath; } } @@ -590,12 +586,11 @@ namespace WebStat { Ingestor::jobPurgeOldLogs() { auto dbconn = dbpool->get(); - const auto stopAt = Job::LastRunTime::clock::now() + settings.purgeDeleteMaxTime; const auto purge = dbconn->modify(SQL::ACCESS_LOG_PURGE_OLD, SQL::ACCESS_LOG_PURGE_OLD_OPTS); purge->bindParam(0, std::format("{} days", settings.purgeDaysToKeep)); purge->bindParam(1, settings.purgeDeleteMax); unsigned int purgedTotal {}; - while (!terminated && stopAt > Job::LastRunTime::clock::now()) { + while (!terminated) { const auto purged = purge->execute(); purgedTotal += purged; if (purged < settings.purgeDeleteMax) { diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 2050b7c..8808ed8 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -32,7 +32,6 @@ namespace WebStat { minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months unsigned int purgeDeleteMax = 10'000; - minutes purgeDeleteMaxTime = 5min; seconds purgeDeletePause = 3s; // NOLINTEND(readability-magic-numbers) }; diff --git a/src/sql.cpp b/src/sql.cpp index a2dac02..63e946c 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -43,7 +43,7 @@ namespace WebStat::SQL { HASH_OPTS(ENTITY_UPDATE_DETAIL); HASH_OPTS(HOST_UPSERT); const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS - = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true); + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, false); HASH_OPTS(DELETE_ENTITY); HASH_OPTS(MARK_ENTITY_RETRIED); HASH_OPTS(SET_ENTITY_TYPE); diff --git a/src/sql/accessLogPurgeOld.sql b/src/sql/accessLogPurgeOld.sql index 00e55b5..616b3a6 100644 --- a/src/sql/accessLogPurgeOld.sql +++ b/src/sql/accessLogPurgeOld.sql @@ -5,8 +5,6 @@ WITH delete_batch AS ( access_log WHERE request_time < CURRENT_DATE - ?::interval - ORDER BY - request_time FOR UPDATE LIMIT ?) DELETE FROM access_log AS al USING delete_batch AS del diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql index 5c07791..894ab67 100644 --- a/src/sql/selectUninsertableLines.sql +++ b/src/sql/selectUninsertableLines.sql @@ -9,3 +9,4 @@ WHERE ORDER BY id LIMIT ? +FOR UPDATE diff --git a/src/util.cpp b/src/util.cpp new file mode 100644 index 0000000..cdce30e --- /dev/null +++ b/src/util.cpp @@ -0,0 +1,11 @@ +#include "util.hpp" + +namespace WebStat { + std::error_code + renameNoExcept(const std::filesystem::path & path, const std::filesystem::path & finalPath) noexcept + { + std::error_code error; + std::filesystem::rename(path, finalPath, error); + return error; + } +} diff --git a/src/util.hpp b/src/util.hpp index 5cac5a3..d5e2482 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -2,6 +2,7 @@ #include <chrono> #include <command.h> +#include <filesystem> #include <scn/scan.h> #include <shared_mutex> #include <tuple> @@ -144,4 +145,7 @@ namespace WebStat { ValueType value; mutable MutexType mutex; }; + + std::error_code renameNoExcept( + const std::filesystem::path & path, const std::filesystem::path & finalPath) noexcept; } diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 8dd9f52..963e318 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -83,8 +83,6 @@ main(int argc, char ** argv) "How many days of access log entries to keep") ("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax), "Maximum number of access log entries to delete in a single operation") - ("job.purge.time", po::value(&settings.purgeDeleteMaxTime)->default_value(settings.purgeDeleteMaxTime), - "Maximum amount of time to spending purging old access log entries before continuing to ingest") ("job.purge.pause", po::value(&settings.purgeDeletePause)->default_value(settings.purgeDeletePause), "Time to pause for between repeated exections of a delete operation") ; |
