summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ingestor.cpp25
-rw-r--r--src/ingestor.hpp13
-rw-r--r--src/webstat_logger_main.cpp4
-rw-r--r--test/perf-ingest.cpp2
-rw-r--r--test/test-ingest.cpp3
5 files changed, 25 insertions, 22 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index f824c80..5c47ab7 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -53,12 +53,13 @@ namespace WebStat {
}
}
- Ingestor::Ingestor(const std::string_view hostname, DB::ConnectionPtr dbconn) :
- hostnameId {crc32(hostname)}, dbconn {std::move(dbconn)}
+ Ingestor::Ingestor(const std::string_view hostname, DB::ConnectionPoolPtr dbpl) :
+ hostnameId {crc32(hostname)}, dbpool {std::move(dbpl)}
{
- storeEntities({
- std::make_pair(hostnameId, hostname),
- });
+ storeEntities(dbpool->get().get(),
+ {
+ std::make_pair(hostnameId, hostname),
+ });
}
Ingestor::ScanResult
@@ -85,12 +86,12 @@ namespace WebStat {
{
while (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) {
linesRead++;
- ingestLogLine(line->value());
+ ingestLogLine(dbpool->get().get(), line->value());
}
}
void
- Ingestor::ingestLogLine(const std::string_view line)
+ Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line)
{
if (auto result = scanLogLine(line)) {
linesParsed++;
@@ -98,9 +99,9 @@ namespace WebStat {
std::optional<DB::TransactionScope> dbtx;
if (const auto newEnts = newEntities(values); newEnts.front()) {
dbtx.emplace(*dbconn);
- storeEntities(newEnts);
+ storeEntities(dbconn, newEnts);
}
- storeLogLine(values);
+ storeLogLine(dbconn, values);
}
else {
syslog(LOG_WARNING, "Discarded line: [%.*s]", static_cast<int>(line.length()), line.data());
@@ -135,10 +136,10 @@ namespace WebStat {
}
void
- Ingestor::storeEntities(const std::span<const std::optional<Entity>> values) const
+ Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const
{
std::ranges::for_each(
- values | std::views::take_while(&std::optional<Entity>::has_value), [this](auto && entity) {
+ values | std::views::take_while(&std::optional<Entity>::has_value), [this, dbconn](auto && entity) {
auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS);
insert->bindParamI(0, entity->first);
insert->bindParamS(1, entity->second);
@@ -149,7 +150,7 @@ namespace WebStat {
template<typename... T>
void
- Ingestor::storeLogLine(const std::tuple<T...> & values) const
+ Ingestor::storeLogLine(DB::Connection * dbconn, const std::tuple<T...> & values) const
{
auto insert = dbconn->modify(SQL::ACCESS_LOG_INSERT, SQL::ACCESS_LOG_INSERT_OPTS);
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 040331f..ef3ba19 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -2,6 +2,7 @@
#include "logTypes.hpp"
#include <c++11Helpers.h>
+#include <connectionPool.h>
#include <connection_fwd.h>
#include <cstdio>
#include <flat_set>
@@ -11,10 +12,10 @@
namespace WebStat {
class Ingestor {
public:
- Ingestor(std::string_view hostname, DB::ConnectionPtr dbconn);
+ Ingestor(std::string_view hostname, DB::ConnectionPoolPtr);
virtual ~Ingestor() = default;
- SPECIAL_MEMBERS_DEFAULT_MOVE_NO_COPY(Ingestor);
+ SPECIAL_MEMBERS_DELETE(Ingestor);
using ScanResult = decltype(scn::scan<std::string_view, std::string_view, uint64_t, std::string_view,
QuotedString, QueryString, std::string_view, unsigned short, unsigned int, unsigned int, CLFString,
@@ -24,9 +25,9 @@ namespace WebStat {
[[nodiscard]] static ScanResult scanLogLine(std::string_view);
void ingestLog(std::FILE *);
- void ingestLogLine(std::string_view);
+ void ingestLogLine(DB::Connection *, std::string_view);
- template<typename... T> void storeLogLine(const std::tuple<T...> &) const;
+ template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
protected:
size_t linesRead = 0;
@@ -35,12 +36,12 @@ namespace WebStat {
private:
static constexpr size_t MAX_NEW_ENTITIES = 6;
- void storeEntities(std::span<const std::optional<Entity>>) const;
+ void storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const;
using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>;
template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const;
mutable std::flat_set<Crc32Value> existingEntities;
uint32_t hostnameId;
- DB::ConnectionPtr dbconn;
+ DB::ConnectionPoolPtr dbpool;
};
}
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 0db940a..1a2c8cb 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -46,7 +46,7 @@ main(int argc, char ** argv)
}
po::notify(optVars);
- auto dbconn = std::make_shared<PQ::Connection>("dbname=webstat user=webstat");
- WebStat::Ingestor {getHostname(false), dbconn}.ingestLog(stdin);
+ auto pool = std::make_shared<DB::ConnectionPool>(1, 1, "postgresql", "dbname=webstat user=webstat");
+ WebStat::Ingestor {getHostname(false), pool}.ingestLog(stdin);
return EXIT_SUCCESS;
}
diff --git a/test/perf-ingest.cpp b/test/perf-ingest.cpp
index 6de3dae..36a3b49 100644
--- a/test/perf-ingest.cpp
+++ b/test/perf-ingest.cpp
@@ -102,7 +102,7 @@ namespace {
void
doIngestFile(benchmark::State & state)
{
- WebStat::Ingestor ingestor {"perf-hostname", DB::MockDatabase::openConnectionTo("webstat")};
+ WebStat::Ingestor ingestor {"perf-hostname", std::make_shared<WebStat::MockDBPool>("webstat")};
for (auto loop : state) {
WebStat::FilePtr logFile {fopen(TMP_LOG.c_str(), "r")};
ingestor.ingestLog(logFile.get());
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index 4845bf6..fb35915 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -178,5 +178,6 @@ BOOST_DATA_TEST_CASE(StoreLogLine,
}),
line)
{
- WebStat::Ingestor {"test-hostname", DB::MockDatabase::openConnectionTo("webstat")}.ingestLogLine(line);
+ WebStat::Ingestor {"test-hostname", std::make_shared<MockDBPool>("webstat")}.ingestLogLine(
+ DB::MockDatabase::openConnectionTo("webstat").get(), line);
}