diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ingestor.cpp | 46 | ||||
| -rw-r--r-- | src/ingestor.hpp | 4 | ||||
| -rw-r--r-- | src/sql.cpp | 17 | ||||
| -rw-r--r-- | src/sql.hpp | 4 | ||||
| -rw-r--r-- | src/sql/deleteEntity.sql | 2 | ||||
| -rw-r--r-- | src/sql/markEntityRetried.sql | 6 | ||||
| -rw-r--r-- | src/sql/selectUninsertableLines.sql | 8 | ||||
| -rw-r--r-- | src/sql/setEntityType.sql | 6 | ||||
| -rw-r--r-- | src/webstat_logger_main.cpp | 2 |
9 files changed, 93 insertions, 2 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 9f30263..d9437c9 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -134,7 +134,7 @@ namespace WebStat { settings {std::move(givenSettings)}, dbpool {std::move(dbpl)}, handleCompleteCurlOps {&Ingestor::jobHandleCompleteCurlOps, &Ingestor::haveCurlOperations}, ingestParkedLines {&Ingestor::jobReadParkedLines}, purgeOldLogs {&Ingestor::jobPurgeOldLogs}, - storeQueueLines {&Ingestor::jobStoreQueuedLines}, + storeQueueLines {&Ingestor::jobStoreQueuedLines}, retryUninsertableLines {&Ingestor::jobRetryUninsertableLines}, hostnameId {insert(dbpool->get(), SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS, host.nodename, host.sysname, host.release, host.version, host.machine, host.domainname)}, curl {curl_multi_init()} @@ -338,6 +338,49 @@ namespace WebStat { return std::make_pair(value->hash, *value->id); }); + Ingestor::Job::Result + Ingestor::jobRetryUninsertableLines() + { + auto dbh = dbpool->get(); + auto dbconn = dbh.get(); + auto lineSelect = dbconn->select(SQL::SELECT_UNINSERTABLE, SQL::SELECT_UNINSERTABLE_OPTS); + auto markLineRetried = dbconn->modify(SQL::MARK_ENTITY_RETRIED, SQL::MARK_ENTITY_RETRIED_OPTS); + auto deleteLine = dbconn->modify(SQL::DELETE_ENTITY, SQL::DELETE_ENTITY_OPTS); + auto setEntityUnparsable = dbconn->modify(SQL::SET_ENTITY_TYPE, SQL::SET_ENTITY_TYPE_OPTS); + setEntityUnparsable->bindParamS(0, "unparsable_line"); + + unsigned int stored = 0; + for (auto [id, line] : lineSelect->as<EntityId, std::string>()) { + try { + DB::TransactionScope lineTx {*dbconn}; + if (auto result = scanLogLine(line)) { + auto values = hashScanValues(result->values()); + auto valuesEntities = entities(values); + fillKnownEntities(valuesEntities); + storeNewEntities(dbconn, valuesEntities); + existingEntities()->insert_range(valuesEntities | ENTITY_IDS); + storeLogLine(dbconn, values); + + deleteLine->bindParamI(0, id); + deleteLine->execute(); + stored += 1; + } + else { + // unparseable - was parsable previously, isn't now 🤷 + setEntityUnparsable->bindParamI(1, id); + setEntityUnparsable->execute(); + } + } + catch (const std::exception & err) { + bindMany(markLineRetried, 0, err.what(), id); + markLineRetried->execute(); + } + } + return [stored]() { + return stored; + }; + } + template<typename... T> std::vector<Entity *> Ingestor::entities(std::tuple<T...> & values) @@ -462,6 +505,7 @@ namespace WebStat { runJobAsNeeded(handleCompleteCurlOps, std::chrono::minutes {1}); runJobAsNeeded(ingestParkedLines, settings.freqIngestParkedLines); runJobAsNeeded(purgeOldLogs, settings.freqPurgeOldLogs); + runJobAsNeeded(retryUninsertableLines, settings.freqPurgeOldLogs); } void diff --git a/src/ingestor.hpp b/src/ingestor.hpp index c2a47a4..2050b7c 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -28,6 +28,7 @@ namespace WebStat { size_t maxBatchSize = 1; minutes checkJobsAfter = 1min; minutes freqIngestParkedLines = 30min; + minutes freqRetryUninsertableLines = 4h; minutes freqPurgeOldLogs = 6h; unsigned int purgeDaysToKeep = 61; // ~2 months unsigned int purgeDeleteMax = 10'000; @@ -78,6 +79,7 @@ namespace WebStat { Job::Result jobReadParkedLines(); Job::Result jobPurgeOldLogs(); Job::Result jobStoreQueuedLines(); + Job::Result jobRetryUninsertableLines(); template<typename... T> void storeLogLine(DB::Connection *, const std::tuple<T...> &) const; @@ -109,8 +111,8 @@ namespace WebStat { Job ingestParkedLines; Job purgeOldLogs; Job storeQueueLines; + Job retryUninsertableLines; - private: template<typename... T> static std::vector<Entity *> entities(std::tuple<T...> &); void fillKnownEntities(std::span<Entity *>) const; void storeNewEntities(DB::Connection *, std::span<Entity *>) const; diff --git a/src/sql.cpp b/src/sql.cpp index 801a905..a2dac02 100644 --- a/src/sql.cpp +++ b/src/sql.cpp @@ -22,6 +22,18 @@ namespace WebStat::SQL { const std::string HOST_UPSERT { #embed "sql/hostUpsert.sql" }; + const std::string SELECT_UNINSERTABLE { +#embed "sql/selectUninsertableLines.sql" + }; + const std::string DELETE_ENTITY { +#embed "sql/deleteEntity.sql" + }; + const std::string MARK_ENTITY_RETRIED { +#embed "sql/markEntityRetried.sql" + }; + const std::string SET_ENTITY_TYPE { +#embed "sql/setEntityType.sql" + }; #define HASH_OPTS(VAR) \ const DB::CommandOptionsPtr VAR##_OPTS \ = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(VAR), 35, false) @@ -30,5 +42,10 @@ namespace WebStat::SQL { HASH_OPTS(ENTITY_INSERT); HASH_OPTS(ENTITY_UPDATE_DETAIL); HASH_OPTS(HOST_UPSERT); + const DB::CommandOptionsPtr SELECT_UNINSERTABLE_OPTS + = std::make_shared<PQ::CommandOptions>(std::hash<std::string> {}(SELECT_UNINSERTABLE), 35, true); + HASH_OPTS(DELETE_ENTITY); + HASH_OPTS(MARK_ENTITY_RETRIED); + HASH_OPTS(SET_ENTITY_TYPE); #undef HASH_OPTS } diff --git a/src/sql.hpp b/src/sql.hpp index 1a12823..ae3559a 100644 --- a/src/sql.hpp +++ b/src/sql.hpp @@ -13,5 +13,9 @@ namespace WebStat::SQL { EMBED_DECLARE(ENTITY_INSERT); EMBED_DECLARE(ENTITY_UPDATE_DETAIL); EMBED_DECLARE(HOST_UPSERT); + EMBED_DECLARE(SELECT_UNINSERTABLE); + EMBED_DECLARE(DELETE_ENTITY); + EMBED_DECLARE(MARK_ENTITY_RETRIED); + EMBED_DECLARE(SET_ENTITY_TYPE); #undef EMBED_DECLARE } diff --git a/src/sql/deleteEntity.sql b/src/sql/deleteEntity.sql new file mode 100644 index 0000000..e201384 --- /dev/null +++ b/src/sql/deleteEntity.sql @@ -0,0 +1,2 @@ +DELETE FROM entities +WHERE id = ? diff --git a/src/sql/markEntityRetried.sql b/src/sql/markEntityRetried.sql new file mode 100644 index 0000000..6ec2263 --- /dev/null +++ b/src/sql/markEntityRetried.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + detail = jsonb_build_object('retriedAt', CURRENT_TIMESTAMP at time zone 'utc', 'error', ?::text) +WHERE + id = ? diff --git a/src/sql/selectUninsertableLines.sql b/src/sql/selectUninsertableLines.sql new file mode 100644 index 0000000..048726b --- /dev/null +++ b/src/sql/selectUninsertableLines.sql @@ -0,0 +1,8 @@ +SELECT + id, + value +FROM + entities +WHERE + type = 'uninsertable_line' + AND detail IS NULL diff --git a/src/sql/setEntityType.sql b/src/sql/setEntityType.sql new file mode 100644 index 0000000..5c981b9 --- /dev/null +++ b/src/sql/setEntityType.sql @@ -0,0 +1,6 @@ +UPDATE + entities +SET + type = ?::entity +WHERE + id = ? diff --git a/src/webstat_logger_main.cpp b/src/webstat_logger_main.cpp index 1d14532..8dd9f52 100644 --- a/src/webstat_logger_main.cpp +++ b/src/webstat_logger_main.cpp @@ -77,6 +77,8 @@ main(int argc, char ** argv) "How often to check for and import parked log lines") ("job.purge.freq", po::value(&settings.freqPurgeOldLogs)->default_value(settings.freqPurgeOldLogs), "How often to purge old access log entries from the database") + ("job.retryUninsertable.freq", po::value(&settings.freqRetryUninsertableLines)->default_value(settings.freqRetryUninsertableLines), + "After how long to retry inserting log lines which previously could not be inserted") ("job.purge.days", po::value(&settings.purgeDaysToKeep)->default_value(settings.purgeDaysToKeep), "How many days of access log entries to keep") ("job.purge.max", po::value(&settings.purgeDeleteMax)->default_value(settings.purgeDeleteMax), |
