diff options
-rw-r--r-- | src/ingestor.cpp | 25 | ||||
-rw-r--r-- | src/ingestor.hpp | 13 | ||||
-rw-r--r-- | src/webstat_logger_main.cpp | 4 | ||||
-rw-r--r-- | test/perf-ingest.cpp | 2 | ||||
-rw-r--r-- | test/test-ingest.cpp | 3 |
5 files changed, 25 insertions, 22 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index f824c80..5c47ab7 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -53,12 +53,13 @@ namespace WebStat { } } - Ingestor::Ingestor(const std::string_view hostname, DB::ConnectionPtr dbconn) : - hostnameId {crc32(hostname)}, dbconn {std::move(dbconn)} + Ingestor::Ingestor(const std::string_view hostname, DB::ConnectionPoolPtr dbpl) : + hostnameId {crc32(hostname)}, dbpool {std::move(dbpl)} { - storeEntities({ - std::make_pair(hostnameId, hostname), - }); + storeEntities(dbpool->get().get(), + { + std::make_pair(hostnameId, hostname), + }); } Ingestor::ScanResult @@ -85,12 +86,12 @@ namespace WebStat { { while (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { linesRead++; - ingestLogLine(line->value()); + ingestLogLine(dbpool->get().get(), line->value()); } } void - Ingestor::ingestLogLine(const std::string_view line) + Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) { if (auto result = scanLogLine(line)) { linesParsed++; @@ -98,9 +99,9 @@ namespace WebStat { std::optional<DB::TransactionScope> dbtx; if (const auto newEnts = newEntities(values); newEnts.front()) { dbtx.emplace(*dbconn); - storeEntities(newEnts); + storeEntities(dbconn, newEnts); } - storeLogLine(values); + storeLogLine(dbconn, values); } else { syslog(LOG_WARNING, "Discarded line: [%.*s]", static_cast<int>(line.length()), line.data()); @@ -135,10 +136,10 @@ namespace WebStat { } void - Ingestor::storeEntities(const std::span<const std::optional<Entity>> values) const + Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const { std::ranges::for_each( - values | std::views::take_while(&std::optional<Entity>::has_value), [this](auto && entity) { + values | std::views::take_while(&std::optional<Entity>::has_value), [this, dbconn](auto && entity) { auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); insert->bindParamI(0, entity->first); insert->bindParamS(1, entity->second); @@ -149,7 +150,7 @@ namespace WebStat { template<typename... T> void - Ingestor::storeLogLine(const std::tuple<T...> & values) const + Ingestor::storeLogLine(DB::Connection * dbconn, const std::tuple<T...> & values) const { auto insert = dbconn->modify(SQL::ACCESS_LOG_INSERT, SQL::ACCESS_LOG_INSERT_OPTS); diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 040331f..ef3ba19 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -2,6 +2,7 @@ #include "logTypes.hpp" #include <c++11Helpers.h> +#include <connectionPool.h> #include <connection_fwd.h> #include <cstdio> #include <flat_set> @@ -11,10 +12,10 @@ namespace WebStat { class Ingestor { public: - Ingestor(std::string_view hostname, DB::ConnectionPtr dbconn); + Ingestor(std::string_view hostname, DB::ConnectionPoolPtr); virtual ~Ingestor() = default; - SPECIAL_MEMBERS_DEFAULT_MOVE_NO_COPY(Ingestor); + SPECIAL_MEMBERS_DELETE(Ingestor); using ScanResult = decltype(scn::scan<std::string_view, std::string_view, uint64_t, std::string_view, QuotedString, QueryString, std::string_view, unsigned short, unsigned int, unsigned int, CLFString, @@ -24,9 +25,9 @@ namespace WebStat { [[nodiscard]] static ScanResult scanLogLine(std::string_view); void ingestLog(std::FILE *); - void ingestLogLine(std::string_view); + void ingestLogLine(DB::Connection *, std::string_view); - template<typename... T> void storeLogLine(const std::tuple<T...> &) const; + template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; protected: size_t linesRead = 0; @@ -35,12 +36,12 @@ namespace WebStat { private: static constexpr size_t MAX_NEW_ENTITIES = 6; - void storeEntities(std::span<const std::optional<Entity>>) const; + void storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const; using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>; template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const; mutable std::flat_set<Crc32Value> existingEntities; uint32_t hostnameId; - DB::ConnectionPtr dbconn; + DB::ConnectionPoolPtr dbpool; }; } diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 0db940a..1a2c8cb 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -46,7 +46,7 @@ main(int argc, char ** argv) } po::notify(optVars); - auto dbconn = std::make_shared<PQ::Connection>("dbname=webstat user=webstat"); - WebStat::Ingestor {getHostname(false), dbconn}.ingestLog(stdin); + auto pool = std::make_shared<DB::ConnectionPool>(1, 1, "postgresql", "dbname=webstat user=webstat"); + WebStat::Ingestor {getHostname(false), pool}.ingestLog(stdin); return EXIT_SUCCESS; } diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp index 6de3dae..36a3b49 100644 --- a/test/perf-ingest.cpp +++ b/test/perf-ingest.cpp @@ -102,7 +102,7 @@ namespace { void doIngestFile(benchmark::State & state) { - WebStat::Ingestor ingestor {"perf-hostname", DB::MockDatabase::openConnectionTo("webstat")}; + WebStat::Ingestor ingestor {"perf-hostname", std::make_shared<WebStat::MockDBPool>("webstat")}; for (auto loop : state) { WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")}; ingestor.ingestLog(logFile.get()); diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 4845bf6..fb35915 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -178,5 +178,6 @@ BOOST_DATA_TEST_CASE(StoreLogLine, }), line) { - WebStat::Ingestor {"test-hostname", DB::MockDatabase::openConnectionTo("webstat")}.ingestLogLine(line); + WebStat::Ingestor {"test-hostname", std::make_shared<MockDBPool>("webstat")}.ingestLogLine( + DB::MockDatabase::openConnectionTo("webstat").get(), line); } |