From 7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 17 May 2026 14:32:11 +0100 Subject: Add ThreadSafeT helper Wraps a templated value and a templated mutex (defaults to shared_mutex) and provides safe access, locked with either a shared_lock (const value) or lock_guard (non-const value). Applies this to existingEntities. --- src/ingestor.cpp | 9 +++++---- src/ingestor.hpp | 2 +- src/util.hpp | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..e8dceb8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -325,7 +325,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -372,7 +372,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -558,8 +558,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +640,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 738357b..c2a47a4 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -99,7 +99,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::map existingEntities; + ThreadSafeT> existingEntities; LineBatch queuedLines, processingLines; bool terminated = false; diff --git a/src/util.hpp b/src/util.hpp index f7254e8..5cac5a3 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include namespace WebStat { @@ -95,4 +96,52 @@ namespace WebStat { } return false; } + + template class ThreadSafeT { + public: + template ThreadSafeT(P &&... params) : value {std::forward

(params)...} { } + + template class Locked { + public: + Locked(LockedValueType & valueRef, MutexType & mutex) : value {valueRef}, lock {mutex} { } + + LockedValueType * + operator->() + { + return &value; + } + + private: + LockedValueType & value; + LockType lock; + }; + + Locked> + shared() const + { + return {value, mutex}; + } + + Locked> + unique() + { + return {value, mutex}; + } + + Locked> + operator->() const + { + return shared(); + } + + Locked> + operator()() + { + return unique(); + } + + private: + ValueType value; + mutable MutexType mutex; + }; } -- cgit v1.3