summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-05-17 14:32:11 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-05-17 16:18:08 +0100
commit7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06 (patch)
tree1ea4e2565a8f8f57e85d27291810cad07cd3b6be
parent29f458117184af5b1507cac01b48b41bfbad568a (diff)
downloadwebstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.tar.bz2
webstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.tar.xz
webstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.zip
Add ThreadSafeT helper
Wraps a templated value and a templated mutex (defaults to shared_mutex) and provides safe access, locked with either a shared_lock (const value) or lock_guard (non-const value). Applies this to existingEntities.
-rw-r--r--src/ingestor.cpp9
-rw-r--r--src/ingestor.hpp2
-rw-r--r--src/util.hpp49
-rw-r--r--test/test-ingest.cpp8
4 files changed, 59 insertions, 9 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 3e6e307..e8dceb8 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -325,7 +325,7 @@ namespace WebStat {
}
catch (const std::exception & excp) {
log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what());
- existingEntities.clear();
+ existingEntities()->clear();
}
auto count = std::distance(processingLines.begin(), storedEnd);
processingLines.erase(processingLines.begin(), storedEnd);
@@ -372,7 +372,7 @@ namespace WebStat {
try {
DB::TransactionScope lineTx {*dbconn};
storeNewEntities(dbconn, valuesEntities);
- existingEntities.insert_range(valuesEntities | entityIds);
+ existingEntities()->insert_range(valuesEntities | entityIds);
storeLogLine(dbconn, values);
}
catch (const DB::Error & originalError) {
@@ -558,8 +558,9 @@ namespace WebStat {
void
Ingestor::fillKnownEntities(const std::span<Entity *> entities) const
{
+ auto lockedEntities = existingEntities.shared();
for (const auto entity : entities) {
- if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) {
+ if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) {
entity->id = existing->second;
}
}
@@ -639,7 +640,7 @@ namespace WebStat {
"Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, "
"entitiesInserted %zu, entitiesKnown %zu",
queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted,
- stats.entitiesInserted, existingEntities.size());
+ stats.entitiesInserted, existingEntities->size());
}
void
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index 738357b..c2a47a4 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -99,7 +99,7 @@ namespace WebStat {
DB::ConnectionPoolPtr dbpool;
mutable Stats stats {};
- std::map<EntityHash, EntityId> existingEntities;
+ ThreadSafeT<std::map<EntityHash, EntityId>> existingEntities;
LineBatch queuedLines, processingLines;
bool terminated = false;
diff --git a/src/util.hpp b/src/util.hpp
index f7254e8..5cac5a3 100644
--- a/src/util.hpp
+++ b/src/util.hpp
@@ -3,6 +3,7 @@
#include <chrono>
#include <command.h>
#include <scn/scan.h>
+#include <shared_mutex>
#include <tuple>
namespace WebStat {
@@ -95,4 +96,52 @@ namespace WebStat {
}
return false;
}
+
+ template<typename ValueType, typename MutexType = std::shared_mutex> class ThreadSafeT {
+ public:
+ template<typename... P> ThreadSafeT(P &&... params) : value {std::forward<P>(params)...} { }
+
+ template<typename LockedValueType, typename LockType> class Locked {
+ public:
+ Locked(LockedValueType & valueRef, MutexType & mutex) : value {valueRef}, lock {mutex} { }
+
+ LockedValueType *
+ operator->()
+ {
+ return &value;
+ }
+
+ private:
+ LockedValueType & value;
+ LockType lock;
+ };
+
+ Locked<const ValueType, std::shared_lock<MutexType>>
+ shared() const
+ {
+ return {value, mutex};
+ }
+
+ Locked<ValueType, std::lock_guard<MutexType>>
+ unique()
+ {
+ return {value, mutex};
+ }
+
+ Locked<const ValueType, std::shared_lock<MutexType>>
+ operator->() const
+ {
+ return shared();
+ }
+
+ Locked<ValueType, std::lock_guard<MutexType>>
+ operator()()
+ {
+ return unique();
+ }
+
+ private:
+ ValueType value;
+ mutable MutexType mutex;
+ };
}
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index 0dad6e6..c2ac4b3 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -264,7 +264,7 @@ BOOST_DATA_TEST_CASE(StoreLogLine,
BOOST_CHECK_EQUAL(stats.linesParseFailed, 0);
BOOST_CHECK_EQUAL(stats.logsInserted, 1);
BOOST_CHECK_EQUAL(stats.entitiesInserted, 5);
- BOOST_CHECK_EQUAL(existingEntities.size(), 5);
+ BOOST_CHECK_EQUAL(existingEntities->size(), 5);
}
BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine"))
@@ -277,7 +277,7 @@ BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine"))
BOOST_CHECK_EQUAL(stats.linesParsed, 10);
BOOST_CHECK_EQUAL(stats.linesParseFailed, 0);
BOOST_CHECK_GE(stats.entitiesInserted, 1);
- BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities.size());
+ BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities->size());
}
BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5))
@@ -306,7 +306,7 @@ BOOST_AUTO_TEST_CASE(ParkLogLine)
BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine"))
{
- BOOST_REQUIRE(existingEntities.empty());
+ BOOST_REQUIRE(existingEntities->empty());
constexpr std::string_view LOGLINE_BAD_VERB
= R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG";
BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}}));
@@ -418,7 +418,7 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable)
BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end());
BOOST_CHECK_EQUAL(stats.linesParseFailed, 1);
BOOST_CHECK_EQUAL(stats.entitiesInserted, 1);
- BOOST_CHECK(existingEntities.empty()); // Don't clutter existing entities with junk logs
+ BOOST_CHECK(existingEntities->empty()); // Don't clutter existing entities with junk logs
}
BOOST_AUTO_TEST_CASE(PurgeOldJob)