From 9592817df3dd98814c5ae0845dd1617ac20d774d Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 25 Jul 2021 12:45:15 +0100 Subject: Add before/after event to update position in replication stream --- lib/output/pq/updateDatabase.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'lib/output/pq/updateDatabase.cpp') diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index b6809c2..c43d8e4 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -247,6 +248,19 @@ namespace MyGrate::Output::Pq { } } + void + UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &) + { + beginTx(); + } + + void + UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e) + { + output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source); + commitTx(); + } + void UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out) { @@ -294,6 +308,7 @@ namespace MyGrate::Output::Pq { out->update = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector updateValues; updateValues.reserve(out->columns.size() + out->keys); RowPair rp {e->event.rows, table_map->event.table_map}; @@ -301,6 +316,7 @@ namespace MyGrate::Output::Pq { copyKeys(rp.first, out, std::back_inserter(updateValues)); out->update->execute(updateValues); verify(out->update->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } @@ -324,12 +340,14 @@ namespace MyGrate::Output::Pq { out->deleteFrom = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector updateValues; updateValues.reserve(out->keys); Row rp {e->event.rows, table_map->event.table_map}; copyKeys(rp, out, std::back_inserter(updateValues)); out->deleteFrom->execute(updateValues); verify(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); + afterEvent(e); } } @@ -356,12 +374,14 @@ namespace MyGrate::Output::Pq { out->insertInto = prepare(ou.str().c_str(), out->columns.size()); } + beforeEvent(e); std::vector updateValues; updateValues.reserve(out->columns.size()); Row rp {e->event.rows, table_map->event.table_map}; copyAll(rp, std::back_inserter(updateValues)); out->insertInto->execute(updateValues); verify(out->insertInto->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } } -- cgit v1.2.3