diff options
Diffstat (limited to 'lib/output/pq/updateDatabase.cpp')
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index b6809c2..c43d8e4 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -19,6 +19,7 @@ #include <output/pq/sql/selectSource.h> #include <output/pq/sql/selectSourceSchema.h> #include <output/pq/sql/selectTables.h> +#include <output/pq/sql/updateSourcePosition.h> #include <row.h> #include <stdexcept> #include <streamSupport.h> @@ -248,6 +249,19 @@ namespace MyGrate::Output::Pq { } void + UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &) + { + beginTx(); + } + + void + UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e) + { + output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source); + commitTx(); + } + + void UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out) { verify<std::runtime_error>( @@ -294,6 +308,7 @@ namespace MyGrate::Output::Pq { out->update = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size() + out->keys); RowPair rp {e->event.rows, table_map->event.table_map}; @@ -301,6 +316,7 @@ namespace MyGrate::Output::Pq { copyKeys(rp.first, out, std::back_inserter(updateValues)); out->update->execute(updateValues); verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } @@ -324,12 +340,14 @@ namespace MyGrate::Output::Pq { out->deleteFrom = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->keys); Row rp {e->event.rows, table_map->event.table_map}; copyKeys(rp, out, std::back_inserter(updateValues)); out->deleteFrom->execute(updateValues); verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); + afterEvent(e); } } @@ -356,12 +374,14 @@ namespace MyGrate::Output::Pq { out->insertInto = prepare(ou.str().c_str(), out->columns.size()); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size()); Row rp {e->event.rows, table_map->event.table_map}; copyAll(rp, std::back_inserter(updateValues)); out->insertInto->execute(updateValues); verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } } |