diff options
-rw-r--r-- | src/ingestor.cpp | 32 | ||||
-rw-r--r-- | src/ingestor.hpp | 5 | ||||
-rw-r--r-- | test/test-ingest.cpp | 8 |
3 files changed, 31 insertions, 14 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index f965d1d..4cd7859 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -164,20 +164,30 @@ namespace WebStat { void Ingestor::ingestLogLine(DB::Connection * dbconn, const std::string_view line) { + auto rememberNewEntityIds = [this](const auto & ids) { + existingEntities.insert_range(ids | std::views::take_while(&std::optional<Crc32Value>::has_value) + | std::views::transform([](auto && value) { + return *value; + })); + }; if (auto result = scanLogLine(line)) { linesParsed++; const auto values = crc32ScanValues(result->values()); - std::optional<DB::TransactionScope> dbtx; - if (const auto newEnts = newEntities(values); newEnts.front()) { - dbtx.emplace(*dbconn); - storeEntities(dbconn, newEnts); + NewEntityIds ids; + { + std::optional<DB::TransactionScope> dbtx; + if (const auto newEnts = newEntities(values); newEnts.front()) { + dbtx.emplace(*dbconn); + ids = storeEntities(dbconn, newEnts); + } + storeLogLine(dbconn, values); } - storeLogLine(dbconn, values); + rememberNewEntityIds(ids); } else { linesDiscarded++; const auto unparsableLine = toEntity(line, EntityType::UnparsableLine); - storeEntities(dbconn, {unparsableLine}); + rememberNewEntityIds(storeEntities(dbconn, {unparsableLine})); } } @@ -270,15 +280,16 @@ namespace WebStat { return rtn; } - void + Ingestor::NewEntityIds Ingestor::storeEntities(DB::Connection * dbconn, const std::span<const std::optional<Entity>> values) const { static constexpr std::array ENTITY_TYPE_VALUES { "host", "virtual_host", "path", "query_string", "referrer", "user_agent", "unparsable_line"}; auto insert = dbconn->modify(SQL::ENTITY_INSERT, SQL::ENTITY_INSERT_OPTS); - std::ranges::for_each( - values | std::views::take_while(&std::optional<Entity>::has_value), [this, &insert](auto && entity) { + NewEntityIds ids; + std::ranges::transform(values | std::views::take_while(&std::optional<Entity>::has_value), ids.begin(), + [this, &insert](auto && entity) { const auto & [entityId, type, value] = *entity; bindMany(insert, 0, entityId, ENTITY_TYPE_VALUES[std::to_underlying(type)], value); if (insert->execute() > 0) { @@ -293,8 +304,9 @@ namespace WebStat { break; } } - existingEntities.emplace(std::get<0>(*entity)); + return std::get<0>(*entity); }); + return ids; } template<typename... T> diff --git a/src/ingestor.hpp b/src/ingestor.hpp index a20071e..8be7360 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -60,13 +60,15 @@ namespace WebStat { size_t linesParsed = 0; size_t linesDiscarded = 0; size_t linesParked = 0; + mutable std::flat_set<Crc32Value> existingEntities; using JobLastRunTime = std::chrono::system_clock::time_point; JobLastRunTime lastRunIngestParkedLines; private: static constexpr size_t MAX_NEW_ENTITIES = 6; - void storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const; + using NewEntityIds = std::array<std::optional<Crc32Value>, MAX_NEW_ENTITIES>; + NewEntityIds storeEntities(DB::Connection *, std::span<const std::optional<Entity>>) const; using NewEntities = std::array<std::optional<Entity>, MAX_NEW_ENTITIES>; template<typename... T> NewEntities newEntities(const std::tuple<T...> &) const; void handleCurlOperations(); @@ -75,7 +77,6 @@ namespace WebStat { void jobIngestParkedLine(const std::filesystem::path &, uintmax_t size); using CurlOperations = std::map<CURL *, std::unique_ptr<CurlOperation>>; - mutable std::flat_set<Crc32Value> existingEntities; uint32_t hostnameId; CurlMultiPtr curl; mutable CurlOperations curlOperations; diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp index 9e401b3..9534a9f 100644 --- a/test/test-ingest.cpp +++ b/test/test-ingest.cpp @@ -231,6 +231,7 @@ BOOST_DATA_TEST_CASE(StoreLogLine, BOOST_CHECK_EQUAL(linesRead, 0); BOOST_CHECK_EQUAL(linesParsed, 1); BOOST_CHECK_EQUAL(linesDiscarded, 0); + BOOST_CHECK_EQUAL(existingEntities.size(), 4); } BOOST_AUTO_TEST_CASE(StoreLog, *boost::unit_test::depends_on("I/StoreLogLine")) @@ -258,9 +259,12 @@ BOOST_TEST_DECORATOR(*boost::unit_test::depends_on("I/ParkLogLine")) BOOST_AUTO_TEST_CASE(ParkLogLineOnError) { - BOOST_REQUIRE_NO_THROW(dbpool->get()->execute("SET search_path = ''")); - BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE1)); + BOOST_REQUIRE(existingEntities.empty()); + constexpr std::string_view LOGLINE_BAD_VERB + = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARK "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36")LOG"; + BOOST_REQUIRE_NO_THROW(ingestLogLine(LOGLINE_BAD_VERB)); BOOST_CHECK_EQUAL(linesParked, 1); + BOOST_CHECK(existingEntities.empty()); } BOOST_AUTO_TEST_CASE(IngestParked, *boost::unit_test::depends_on("I/ParkLogLine")) |