diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-17 14:32:11 +0100 |
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2026-05-17 16:18:08 +0100 |
| commit | 7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06 (patch) | |
| tree | 1ea4e2565a8f8f57e85d27291810cad07cd3b6be | |
| parent | 29f458117184af5b1507cac01b48b41bfbad568a (diff) | |
| download | webstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.tar.bz2 webstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.tar.xz webstat-7b1aeee4565fe0a2eed4a4fa8695b2a5fb671e06.zip | |
Add ThreadSafeT helper
Wraps a templated value and a templated mutex (defaults to shared_mutex)
and provides safe access, locked with either a shared_lock (const value)
or lock_guard (non-const value).
Applies this to existingEntities.
| -rw-r--r-- | src/ingestor.cpp | 9 | ||||
| -rw-r--r-- | src/ingestor.hpp | 2 | ||||
| -rw-r--r-- | src/util.hpp | 49 | ||||
| -rw-r--r-- | test/test-ingest.cpp | 8 |
4 files changed, 59 insertions, 9 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 3e6e307..e8dceb8 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -325,7 +325,7 @@ namespace WebStat { } catch (const std::exception & excp) { log(LOG_ERR, "Unhandled exception: %s, clearing known entity list", excp.what()); - existingEntities.clear(); + existingEntities()->clear(); } auto count = std::distance(processingLines.begin(), storedEnd); processingLines.erase(processingLines.begin(), storedEnd); @@ -372,7 +372,7 @@ namespace WebStat { try { DB::TransactionScope lineTx {*dbconn}; storeNewEntities(dbconn, valuesEntities); - existingEntities.insert_range(valuesEntities | entityIds); + existingEntities()->insert_range(valuesEntities | entityIds); storeLogLine(dbconn, values); } catch (const DB::Error & originalError) { @@ -558,8 +558,9 @@ namespace WebStat { void Ingestor::fillKnownEntities(const std::span<Entity *> entities) const { + auto lockedEntities = existingEntities.shared(); for (const auto entity : entities) { - if (auto existing = existingEntities.find(entity->hash); existing != existingEntities.end()) { + if (auto existing = lockedEntities->find(entity->hash); existing != lockedEntities->end()) { entity->id = existing->second; } } @@ -639,7 +640,7 @@ namespace WebStat { "Statistics: linesQueued %zu, linesRead %zu, linesParsed %zu, linesParseFailed %zu, logsInserted %zu, " "entitiesInserted %zu, entitiesKnown %zu", queuedLines.size(), stats.linesRead, stats.linesParsed, stats.linesParseFailed, stats.logsInserted, - stats.entitiesInserted, existingEntities.size()); + stats.entitiesInserted, existingEntities->size()); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index 738357b..c2a47a4 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -99,7 +99,7 @@ namespace WebStat { DB::ConnectionPoolPtr dbpool; mutable Stats stats {}; - std::map<EntityHash, EntityId> existingEntities; + ThreadSafeT<std::map<EntityHash, EntityId>> existingEntities; LineBatch queuedLines, processingLines; bool terminated = false; diff --git a/src/util.hpp b/src/util.hpp index f7254e8..5cac5a3 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -3,6 +3,7 @@ #include <chrono> #include <command.h> #include <scn/scan.h> +#include <shared_mutex> #include <tuple> namespace WebStat { @@ -95,4 +96,52 @@ namespace WebStat { } return false; } + + template<typename ValueType, typename MutexType = std::shared_mutex> class ThreadSafeT { + public: + template<typename... P> ThreadSafeT(P &&... params) : value {std::forward<P>(params)...} { } + + template<typename LockedValueType, typename LockType> class Locked { + public: + Locked(LockedValueType & valueRef, MutexType & mutex) : value {valueRef}, lock {mutex} { } + + LockedValueType * + operator->() + { + return &value; + } + + private: + LockedValueType & value; + LockType lock; + }; + + Locked<const ValueType, std::shared_lock<MutexType>> + shared() const + { + return {value, mutex}; + } + + Locked<ValueType, std::lock_guard<MutexType>> + unique() + { + return {value, mutex}; + } + + Locked<const ValueType, std::shared_lock<MutexType>> + operator->() const + { + return shared(); + } + + Locked<ValueType, std::lock_guard<MutexType>> + operator()() + { + return unique(); + } + + private: + ValueType value; + mutable MutexType mutex; + }; } diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 0dad6e6..c2ac4b3 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -264,7 +264,7 @@ BOOST_DATA_TEST_CASE(StoreLogLine, BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); BOOST_CHECK_EQUAL(stats.logsInserted, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 5); - BOOST_CHECK_EQUAL(existingEntities.size(), 5); + BOOST_CHECK_EQUAL(existingEntities->size(), 5); } BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) @@ -277,7 +277,7 @@ BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) BOOST_CHECK_EQUAL(stats.linesParsed, 10); BOOST_CHECK_EQUAL(stats.linesParseFailed, 0); BOOST_CHECK_GE(stats.entitiesInserted, 1); - BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities.size()); + BOOST_CHECK_EQUAL(stats.entitiesInserted, existingEntities->size()); } BOOST_AUTO_TEST_CASE(TerminateHandler, *boost::unit_test::timeout(5)) @@ -306,7 +306,7 @@ BOOST_AUTO_TEST_CASE(ParkLogLine) BOOST_AUTO_TEST_CASE(ParkLogLineOnError, *boost::unit_test::depends_on("I/ParkLogLine")) { - BOOST_REQUIRE(existingEntities.empty()); + BOOST_REQUIRE(existingEntities->empty()); constexpr std::string_view LOGLINE_BAD_VERB = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; BOOST_REQUIRE_NO_THROW(ingestLogLines(dbpool->get().get(), {std::string {LOGLINE_BAD_VERB}})); @@ -418,7 +418,7 @@ BOOST_AUTO_TEST_CASE(DiscardUnparsable) BOOST_CHECK_EQUAL_COLLECTIONS(rows.begin(), rows.end(), EXPECTED.begin(), EXPECTED.end()); BOOST_CHECK_EQUAL(stats.linesParseFailed, 1); BOOST_CHECK_EQUAL(stats.entitiesInserted, 1); - BOOST_CHECK(existingEntities.empty()); // Don't clutter existing entities with junk logs + BOOST_CHECK(existingEntities->empty()); // Don't clutter existing entities with junk logs } BOOST_AUTO_TEST_CASE(PurgeOldJob) |
