summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ingestor.cpp72
-rw-r--r--src/ingestor.hpp6
-rw-r--r--src/schema.sql4
-rw-r--r--src/sql.cpp17
-rw-r--r--src/sql.hpp4
-rw-r--r--src/sql/deleteEntity.sql2
-rw-r--r--src/sql/markEntityRetried.sql6
-rw-r--r--src/sql/selectUninsertableLines.sql11
-rw-r--r--src/sql/setEntityType.sql6
-rw-r--r--src/util.hpp49
-rw-r--r--src/webstat_logger_main.cpp2
11 files changed, 168 insertions, 11 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 3e6e307..954f872 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -134,7 +134,7 @@ namespace WebStat {
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- storeQueueLines {&Ingestor::jobStoreQueuedLines},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}
@@ -325,7 +325,7 @@ namespace WebStat {
}
catch (const std::exception & excp) {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
- existingEntities.clear();
+ existingEntities()->clear();
}
auto count = std::distance(processingLines.begin(), storedEnd);
processingLines.erase(processingLines.begin(), storedEnd);
@@ -334,6 +334,62 @@ namespace WebStat {
};
}
+ constexpr auto ENTITY_IDS = std::views::transform([](auto && value) {
+ return std::make_pair(value->hash, *value->id);
+ });
+
+ Ingestor::Job::Result
+ Ingestor::jobRetryUninsertableLines()
+ {
+ auto dbh = dbpool->get();
+ auto dbconn = dbh.get();
+ auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS);
+ lineSelect->bindParamI(0, settings.maxBatchSize);
+ auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS);
+ auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS);
+ auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS);
+ setEntityUnparsable->bindParamS(0, "unparsable_line");
+
+ unsigned int stored = 0;
+ while (!terminated) {
+ unsigned int batchSize = 0;
+ DB::TransactionScope batchTx {*dbconn};
+ for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
+ batchSize += 1;
+ try {
+ DB::TransactionScope lineTx {*dbconn};
+ if (auto result = scanLogLine(line)) {
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
+ storeLogLine(dbconn, values);
+
+ deleteLine->bindParamI(0, id);
+ deleteLine->execute();
+ stored += 1;
+ }
+ else {
+ // unparseable - was parsable previously, isn't now 🤷
+ setEntityUnparsable->bindParamI(1, id);
+ setEntityUnparsable->execute();
+ }
+ }
+ catch (const std::exception & err) {
+ bindMany(markLineRetried, 0, err.what(), id);
+ markLineRetried->execute();
+ }
+ }
+ if (batchSize == 0) {
+ break;
+ }
+ }
+ return [stored]() {
+ return stored;
+ };
+ }
+
template<typename... T>
std::vector<Entity *>
Ingestor::entities(std::tuple<T...> & values)
@@ -358,10 +414,6 @@ namespace WebStat {
void
Ingestor::ingestLogLines(DB::Connection * dbconn, const LinesView lines)
{
- auto entityIds = std::views::transform([](auto && value) {
- return std::make_pair(value->hash, *value->id);
- });
-
DB::TransactionScope batchTx {*dbconn};
for (const auto & line : lines) {
if (auto result = scanLogLine(line)) {
@@ -372,7 +424,7 @@ namespace WebStat {
try {
DB::TransactionScope lineTx {*dbconn};
storeNewEntities(dbconn, valuesEntities);
- existingEntities.insert_range(valuesEntities | entityIds);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
@@ -462,6 +514,7 @@ namespace WebStat {
runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1});
runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs);
}
void
@@ -558,8 +611,9 @@ namespace WebStat {
void
Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
+ auto lockedEntities = existingEntities.shared();
for (const auto entity : entities) {
- if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) {
+ if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) {
entity->id = existing->second;
}
}
@@ -639,7 +693,7 @@ namespace WebStat {
"Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, "
"entitiesInserted %zu, entitiesKnown %zu",
queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted,
- stats.entitiesInserted, existingEntities.size());
+ stats.entitiesInserted, existingEntities->size());
}
void
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 738357b..2050b7c 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -28,6 +28,7 @@ namespace WebStat {
size_t maxBatchSize = 1;
minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
+ minutes freqRetryUninsertableLines = 4h;
minutes freqPurgeOldLogs = 6h;
unsigned int purgeDaysToKeep = 61; // ~2 months
unsigned int purgeDeleteMax = 10'000;
@@ -78,6 +79,7 @@ namespace WebStat {
Job::Result jobReadParkedLines();
Job::Result jobPurgeOldLogs();
Job::Result jobStoreQueuedLines();
+ Job::Result jobRetryUninsertableLines();
template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
@@ -99,7 +101,7 @@ namespace WebStat {
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
- std::map<EntityHash, EntityId> existingEntities;
+ ThreadSafeT<std::map<EntityHash, EntityId>> existingEntities;
LineBatch queuedLines, processingLines;
bool terminated = false;
@@ -109,8 +111,8 @@ namespace WebStat {
Job ingestParkedLines;
Job purgeOldLogs;
Job storeQueueLines;
+ Job retryUninsertableLines;
- private:
template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
void fillKnownEntities(std::span<Entity *>) const;
void storeNewEntities(DB::Connection *, std::span<Entity *>) const;
diff --git a/src/schema.sql b/src/schema.sql
index e09be28..0d9fcb4 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -42,6 +42,10 @@ CREATE TABLE entities(
CREATE UNIQUE INDEX uni_entities_value ON entities(MD5(value));
+CREATE INDEX idx_entities_retryinsert ON entities(id)
+WHERE
+ type = 'uninsertable_line' AND detail IS NULL;
+
CREATE OR REPLACE FUNCTION entity(newValue text, newType entity)
RETURNS TABLE(
id integer,
diff --git a/src/sql.cpp b/src/sql.cpp
index 801a905..a2dac02 100644
--- a/src/sql.cpp
+++ b/src/sql.cpp
@@ -22,6 +22,18 @@ namespace WebStat::SQL {
const std::string HOST_UPSERT {
#embed "sql/hostUpsert.sql"
};
+ const std::string SELECT_UNINSERTABLE {
+#embed "sql/selectUninsertableLines.sql"
+ };
+ const std::string DELETE_ENTITY {
+#embed "sql/deleteEntity.sql"
+ };
+ const std::string MARK_ENTITY_RETRIED {
+#embed "sql/markEntityRetried.sql"
+ };
+ const std::string SET_ENTITY_TYPE {
+#embed "sql/setEntityType.sql"
+ };
#define HASH_OPTS(VAR) \
const DB::CommandOptionsPtr VAR##_OPTS \
= std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false)
@@ -30,5 +42,10 @@ namespace WebStat::SQL {
HASH_OPTS(ENTITY_INSERT);
HASH_OPTS(ENTITY_UPDATE_DETAIL);
HASH_OPTS(HOST_UPSERT);
+ const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS
+ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true);
+ HASH_OPTS(DELETE_ENTITY);
+ HASH_OPTS(MARK_ENTITY_RETRIED);
+ HASH_OPTS(SET_ENTITY_TYPE);
#undef HASH_OPTS
}
diff --git a/src/sql.hpp b/src/sql.hpp
index 1a12823..ae3559a 100644
--- a/src/sql.hpp
+++ b/src/sql.hpp
@@ -13,5 +13,9 @@ namespace WebStat::SQL {
EMBED_DECLARE(ENTITY_INSERT);
EMBED_DECLARE(ENTITY_UPDATE_DETAIL);
EMBED_DECLARE(HOST_UPSERT);
+ EMBED_DECLARE(SELECT_UNINSERTABLE);
+ EMBED_DECLARE(DELETE_ENTITY);
+ EMBED_DECLARE(MARK_ENTITY_RETRIED);
+ EMBED_DECLARE(SET_ENTITY_TYPE);
#undef EMBED_DECLARE
}
diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql
new file mode 100644
index 0000000..e201384
--- /dev/null
+++ b/src/sql/deleteEntity.sql
@@ -0,0 +1,2 @@
+DELETE FROM entities
+WHERE id = ?
diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql
new file mode 100644
index 0000000..6ec2263
--- /dev/null
+++ b/src/sql/markEntityRetried.sql
@@ -0,0 +1,6 @@
+UPDATE
+ entities
+SET
+ detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text)
+WHERE
+ id = ?
diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql
new file mode 100644
index 0000000..5c07791
--- /dev/null
+++ b/src/sql/selectUninsertableLines.sql
@@ -0,0 +1,11 @@
+SELECT
+ id,
+ value
+FROM
+ entities
+WHERE
+ type = 'uninsertable_line'
+ AND detail IS NULL
+ORDER BY
+ id
+LIMIT ?
diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql
new file mode 100644
index 0000000..5c981b9
--- /dev/null
+++ b/src/sql/setEntityType.sql
@@ -0,0 +1,6 @@
+UPDATE
+ entities
+SET
+ type = ?::entity
+WHERE
+ id = ?
diff --git a/src/util.hpp b/src/util.hpp
index f7254e8..5cac5a3 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -3,6 +3,7 @@
#include <chrono>
#include <command.h>
#include <scn/scan.h>
+#include <shared_mutex>
#include <tuple>
namespace WebStat {
@@ -95,4 +96,52 @@ namespace WebStat {
}
return false;
}
+
+ template<typename ValueType, typename MutexType = std::shared_mutex> class ThreadSafeT {
+ public:
+ template<typename... P> ThreadSafeT(P &&... params) : value {std::forward<P>(params)...} { }
+
+ template<typename LockedValueType, typename LockType> class Locked {
+ public:
+ Locked(LockedValueType & valueRef, MutexType & mutex) : value {valueRef}, lock {mutex} { }
+
+ LockedValueType *
+ operator->()
+ {
+ return &value;
+ }
+
+ private:
+ LockedValueType & value;
+ LockType lock;
+ };
+
+ Locked<const ValueType, std::shared_lock<MutexType>>
+ shared() const
+ {
+ return {value, mutex};
+ }
+
+ Locked<ValueType, std::lock_guard<MutexType>>
+ unique()
+ {
+ return {value, mutex};
+ }
+
+ Locked<const ValueType, std::shared_lock<MutexType>>
+ operator->() const
+ {
+ return shared();
+ }
+
+ Locked<ValueType, std::lock_guard<MutexType>>
+ operator()()
+ {
+ return unique();
+ }
+
+ private:
+ ValueType value;
+ mutable MutexType mutex;
+ };
}
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 1d14532..8dd9f52 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -77,6 +77,8 @@ main(int argc, char ** argv)
"How often to check for and import parked log lines")
("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs),
"How often to purge old access log entries from the database")
+ ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines),
+ "After how long to retry inserting log lines which previously could not be inserted")
("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep),
"How many days of access log entries to keep")
("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax),