summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2026-05-18 20:43:51 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2026-05-18 20:43:51 +0100
commite1a6654bd5e284842ffbc3b93bd390f3bad7a187 (patch)
treea06a68ec6741e3311b74ee347ed8eb2002dba9fa
parente8a0fb7b9c61d5603a86b52e0f2144a91fd1d84e (diff)
downloadwebstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.bz2
webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.tar.xz
webstat-e1a6654bd5e284842ffbc3b93bd390f3bad7a187.zip
Add job to retry insertion of log lines which had previously failed
Entities are reparsed and reinserted, removed on success. Failure to parse updates the entity type to UnparsableLine. Failure to insert again updates the detail with the reason.
-rw-r--r--src/ingestor.cpp46
-rw-r--r--src/ingestor.hpp4
-rw-r--r--src/sql.cpp17
-rw-r--r--src/sql.hpp4
-rw-r--r--src/sql/deleteEntity.sql2
-rw-r--r--src/sql/markEntityRetried.sql6
-rw-r--r--src/sql/selectUninsertableLines.sql8
-rw-r--r--src/sql/setEntityType.sql6
-rw-r--r--src/webstat_logger_main.cpp2
-rw-r--r--test/test-ingest.cpp97
10 files changed, 190 insertions, 2 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp
index 9f30263..d9437c9 100644
--- a/src/ingestor.cpp
+++ b/src/ingestor.cpp
@@ -134,7 +134,7 @@ namespace WebStat {
settings {std::move(givenSettings)}, dbpool {std::move(dbpl)},
handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations},
ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs},
- storeQueueLines {&Ingestor::jobStoreQueuedLines},
+ storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines},
hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname,
host.release, host.version, host.machine, host.domainname)},
curl {curl_multi_init()}
@@ -338,6 +338,49 @@ namespace WebStat {
return std::make_pair(value->hash, *value->id);
});
+ Ingestor::Job::Result
+ Ingestor::jobRetryUninsertableLines()
+ {
+ auto dbh = dbpool->get();
+ auto dbconn = dbh.get();
+ auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS);
+ auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS);
+ auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS);
+ auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS);
+ setEntityUnparsable->bindParamS(0, "unparsable_line");
+
+ unsigned int stored = 0;
+ for (auto [id, line] : lineSelect->as<EntityId, std::string>()) {
+ try {
+ DB::TransactionScope lineTx {*dbconn};
+ if (auto result = scanLogLine(line)) {
+ auto values = hashScanValues(result->values());
+ auto valuesEntities = entities(values);
+ fillKnownEntities(valuesEntities);
+ storeNewEntities(dbconn, valuesEntities);
+ existingEntities()->insert_range(valuesEntities | ENTITY_IDS);
+ storeLogLine(dbconn, values);
+
+ deleteLine->bindParamI(0, id);
+ deleteLine->execute();
+ stored += 1;
+ }
+ else {
+ // unparseable - was parsable previously, isn't now 🤷
+ setEntityUnparsable->bindParamI(1, id);
+ setEntityUnparsable->execute();
+ }
+ }
+ catch (const std::exception & err) {
+ bindMany(markLineRetried, 0, err.what(), id);
+ markLineRetried->execute();
+ }
+ }
+ return [stored]() {
+ return stored;
+ };
+ }
+
template<typename... T>
std::vector<Entity *>
Ingestor::entities(std::tuple<T...> & values)
@@ -462,6 +505,7 @@ namespace WebStat {
runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1});
runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines);
runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs);
+ runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs);
}
void
diff --git a/src/ingestor.hpp b/src/ingestor.hpp
index c2a47a4..2050b7c 100644
--- a/src/ingestor.hpp
+++ b/src/ingestor.hpp
@@ -28,6 +28,7 @@ namespace WebStat {
size_t maxBatchSize = 1;
minutes checkJobsAfter = 1min;
minutes freqIngestParkedLines = 30min;
+ minutes freqRetryUninsertableLines = 4h;
minutes freqPurgeOldLogs = 6h;
unsigned int purgeDaysToKeep = 61; // ~2 months
unsigned int purgeDeleteMax = 10'000;
@@ -78,6 +79,7 @@ namespace WebStat {
Job::Result jobReadParkedLines();
Job::Result jobPurgeOldLogs();
Job::Result jobStoreQueuedLines();
+ Job::Result jobRetryUninsertableLines();
template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const;
@@ -109,8 +111,8 @@ namespace WebStat {
Job ingestParkedLines;
Job purgeOldLogs;
Job storeQueueLines;
+ Job retryUninsertableLines;
- private:
template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &);
void fillKnownEntities(std::span<Entity *>) const;
void storeNewEntities(DB::Connection *, std::span<Entity *>) const;
diff --git a/src/sql.cpp b/src/sql.cpp
index 801a905..a2dac02 100644
--- a/src/sql.cpp
+++ b/src/sql.cpp
@@ -22,6 +22,18 @@ namespace WebStat::SQL {
const std::string HOST_UPSERT {
#embed "sql/hostUpsert.sql"
};
+ const std::string SELECT_UNINSERTABLE {
+#embed "sql/selectUninsertableLines.sql"
+ };
+ const std::string DELETE_ENTITY {
+#embed "sql/deleteEntity.sql"
+ };
+ const std::string MARK_ENTITY_RETRIED {
+#embed "sql/markEntityRetried.sql"
+ };
+ const std::string SET_ENTITY_TYPE {
+#embed "sql/setEntityType.sql"
+ };
#define HASH_OPTS(VAR) \
const DB::CommandOptionsPtr VAR##_OPTS \
= std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false)
@@ -30,5 +42,10 @@ namespace WebStat::SQL {
HASH_OPTS(ENTITY_INSERT);
HASH_OPTS(ENTITY_UPDATE_DETAIL);
HASH_OPTS(HOST_UPSERT);
+ const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS
+ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true);
+ HASH_OPTS(DELETE_ENTITY);
+ HASH_OPTS(MARK_ENTITY_RETRIED);
+ HASH_OPTS(SET_ENTITY_TYPE);
#undef HASH_OPTS
}
diff --git a/src/sql.hpp b/src/sql.hpp
index 1a12823..ae3559a 100644
--- a/src/sql.hpp
+++ b/src/sql.hpp
@@ -13,5 +13,9 @@ namespace WebStat::SQL {
EMBED_DECLARE(ENTITY_INSERT);
EMBED_DECLARE(ENTITY_UPDATE_DETAIL);
EMBED_DECLARE(HOST_UPSERT);
+ EMBED_DECLARE(SELECT_UNINSERTABLE);
+ EMBED_DECLARE(DELETE_ENTITY);
+ EMBED_DECLARE(MARK_ENTITY_RETRIED);
+ EMBED_DECLARE(SET_ENTITY_TYPE);
#undef EMBED_DECLARE
}
diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql
new file mode 100644
index 0000000..e201384
--- /dev/null
+++ b/src/sql/deleteEntity.sql
@@ -0,0 +1,2 @@
+DELETE FROM entities
+WHERE id = ?
diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql
new file mode 100644
index 0000000..6ec2263
--- /dev/null
+++ b/src/sql/markEntityRetried.sql
@@ -0,0 +1,6 @@
+UPDATE
+ entities
+SET
+ detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text)
+WHERE
+ id = ?
diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql
new file mode 100644
index 0000000..048726b
--- /dev/null
+++ b/src/sql/selectUninsertableLines.sql
@@ -0,0 +1,8 @@
+SELECT
+ id,
+ value
+FROM
+ entities
+WHERE
+ type = 'uninsertable_line'
+ AND detail IS NULL
diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql
new file mode 100644
index 0000000..5c981b9
--- /dev/null
+++ b/src/sql/setEntityType.sql
@@ -0,0 +1,6 @@
+UPDATE
+ entities
+SET
+ type = ?::entity
+WHERE
+ id = ?
diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp
index 1d14532..8dd9f52 100644
--- a/src/webstat_logger_main.cpp
+++ b/src/webstat_logger_main.cpp
@@ -77,6 +77,8 @@ main(int argc, char ** argv)
"How often to check for and import parked log lines")
("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs),
"How often to purge old access log entries from the database")
+ ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines),
+ "After how long to retry inserting log lines which previously could not be inserted")
("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep),
"How many days of access log entries to keep")
("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax),
diff --git a/test/test-ingest.cpp b/test/test-ingest.cpp
index c2ac4b3..408e720 100644
--- a/test/test-ingest.cpp
+++ b/test/test-ingest.cpp
@@ -12,9 +12,52 @@
namespace {
using namespace WebStat;
BOOST_GLOBAL_FIXTURE(MockDB);
+
+ constexpr std::array<std::string_view, 9> ENTITY_TYPE_NAMES {
+ "host",
+ "virtual_host",
+ "path",
+ "query_string",
+ "referrer",
+ "user_agent",
+ "unparsable_line",
+ "uninsertable_line",
+ "content_type",
+ };
+
+ EntityType
+ toEntityType(const std::string_view typeStr)
+ {
+ auto iter = std::ranges::find(ENTITY_TYPE_NAMES, typeStr);
+ if (iter == ENTITY_TYPE_NAMES.end()) {
+ throw std::domain_error {std::format("Unknown entity type {}", typeStr)};
+ }
+ return static_cast<EntityType>(iter - ENTITY_TYPE_NAMES.begin());
+ }
+
+ using EntityWithDetail = std::tuple<EntityId, EntityType, std::string, std::optional<std::string>>;
+
+ std::optional<EntityWithDetail>
+ getEntityById(DB::Connection * dbconn, EntityId id)
+ {
+ auto select = dbconn->select("SELECT type, value, detail FROM entities WHERE id = ?");
+ select->bindParam(0, id);
+ for (auto [typeStr, value, detail] : select->as<std::string, std::string, std::optional<std::string>>()) {
+ return std::make_optional<EntityWithDetail>(id, toEntityType(typeStr), std::move(value), std::move(detail));
+ }
+ return std::nullopt;
+ }
}
namespace std {
+ ostream &
+ operator<<(ostream & strm, const EntityType value)
+ {
+ const auto valueNum = static_cast<size_t>(value);
+ std::print(strm, "EntityType: {} ({})", ENTITY_TYPE_NAMES[valueNum], valueNum);
+ return strm;
+ }
+
template<typename T>
ostream &
operator<<(ostream & strm, const std::optional<T> & value)
@@ -426,6 +469,60 @@ BOOST_AUTO_TEST_CASE(PurgeOldJob)
BOOST_CHECK_EQUAL(2, jobPurgeOldLogs()());
}
+BOOST_AUTO_TEST_CASE(RetryUninsertableNone)
+{
+ BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()());
+}
+
+BOOST_AUTO_TEST_CASE(RetryUninsertableSuccess)
+{
+ auto dbconn = dbpool->get();
+ Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE1};
+ storeNewEntity(dbconn.get(), uninsertable);
+ BOOST_REQUIRE(uninsertable.id);
+ BOOST_REQUIRE(getEntityById(dbconn.get(), *uninsertable.id));
+
+ BOOST_CHECK_EQUAL(1, jobRetryUninsertableLines()());
+ BOOST_REQUIRE(!getEntityById(dbconn.get(), *uninsertable.id));
+}
+
+BOOST_AUTO_TEST_CASE(RetryUninsertableNowUnparsable)
+{
+ auto dbconn = dbpool->get();
+ Entity uninsertable {{}, {}, EntityType::UninsertableLine, "blah"};
+ storeNewEntity(dbconn.get(), uninsertable);
+ BOOST_REQUIRE(uninsertable.id);
+
+ BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()());
+ auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id);
+ BOOST_REQUIRE(updatedEntity);
+ BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UnparsableLine);
+}
+
+BOOST_AUTO_TEST_CASE(RetryUninsertableStillUninsertable)
+{
+ auto dbconn = dbpool->get();
+ constexpr std::string_view LOGLINE_UNINSERTABLE
+ = R"LOG(git.randomdan.homeip.net 98.82.40.168 1755561576768318 CAUSEPARSEFAIL "/repo/gentoobrowse-api/commit/gentoobrowse-api/unittests/fixtures/756569aa764177340726dd3d40b41d89b11b20c7/app-crypt/pdfcrack/Manifest" "?h=gentoobrowse-api-0.9.1&id=a2ed3fd30333721accd4b697bfcb6cc4165c7714" HTTP/1.1 200 1884 107791 "-" "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Amazonbot/0.1; +https://developer.amazon.com/support/amazonbot) Chrome/119.0.6045.214 Safari/537.36" "text/plain")LOG";
+ Entity uninsertable {{}, {}, EntityType::UninsertableLine, LOGLINE_UNINSERTABLE};
+ storeNewEntity(dbconn.get(), uninsertable);
+ BOOST_REQUIRE(uninsertable.id);
+
+ BOOST_CHECK_EQUAL(0, jobRetryUninsertableLines()());
+ auto updatedEntity = getEntityById(dbconn.get(), *uninsertable.id);
+ BOOST_REQUIRE(updatedEntity);
+ BOOST_CHECK_EQUAL(std::get<1>(*updatedEntity), EntityType::UninsertableLine);
+ const auto & detail = std::get<3>(*updatedEntity);
+ BOOST_REQUIRE(detail);
+
+ BOOST_TEST_CONTEXT(*detail) {
+ BOOST_CHECK(detail->starts_with("{"));
+ BOOST_CHECK(detail->contains("invalid input value for enum http_verb"));
+ BOOST_CHECK(detail->contains("retriedAt"));
+ BOOST_CHECK(detail->ends_with("}"));
+ }
+}
+
BOOST_AUTO_TEST_CASE(LogStatsSignal)
{
BOOST_REQUIRE_EQUAL(logsWritten, 0);