diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-25 12:45:15 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-25 12:45:15 +0100 |
commit | 9592817df3dd98814c5ae0845dd1617ac20d774d (patch) | |
tree | 607a0fbcb1f1795c579c7f4d85fa240bf32d54fe /lib | |
parent | Add message to parameter count assertion (diff) | |
download | mygrate-9592817df3dd98814c5ae0845dd1617ac20d774d.tar.bz2 mygrate-9592817df3dd98814c5ae0845dd1617ac20d774d.tar.xz mygrate-9592817df3dd98814c5ae0845dd1617ac20d774d.zip |
Add before/after event to update position in replication stream
Diffstat (limited to 'lib')
-rw-r--r-- | lib/output/pq/sql/updateSourcePosition.sql | 5 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 20 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 |
3 files changed, 25 insertions, 3 deletions
diff --git a/lib/output/pq/sql/updateSourcePosition.sql b/lib/output/pq/sql/updateSourcePosition.sql index 74e22fc..09554f4 100644 --- a/lib/output/pq/sql/updateSourcePosition.sql +++ b/lib/output/pq/sql/updateSourcePosition.sql @@ -1,4 +1,3 @@ UPDATE mygrate.source SET - filename = $1, - position = $2 -WHERE source_id = $3 + position = $1 +WHERE source_id = $2 diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index b6809c2..c43d8e4 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -19,6 +19,7 @@ #include <output/pq/sql/selectSource.h> #include <output/pq/sql/selectSourceSchema.h> #include <output/pq/sql/selectTables.h> +#include <output/pq/sql/updateSourcePosition.h> #include <row.h> #include <stdexcept> #include <streamSupport.h> @@ -248,6 +249,19 @@ namespace MyGrate::Output::Pq { } void + UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &) + { + beginTx(); + } + + void + UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e) + { + output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source); + commitTx(); + } + + void UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out) { verify<std::runtime_error>( @@ -294,6 +308,7 @@ namespace MyGrate::Output::Pq { out->update = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size() + out->keys); RowPair rp {e->event.rows, table_map->event.table_map}; @@ -301,6 +316,7 @@ namespace MyGrate::Output::Pq { copyKeys(rp.first, out, std::back_inserter(updateValues)); out->update->execute(updateValues); verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } @@ -324,12 +340,14 @@ namespace MyGrate::Output::Pq { out->deleteFrom = prepare(ou.str().c_str(), kordinal); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->keys); Row rp {e->event.rows, table_map->event.table_map}; copyKeys(rp, out, std::back_inserter(updateValues)); out->deleteFrom->execute(updateValues); verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); + afterEvent(e); } } @@ -356,12 +374,14 @@ namespace MyGrate::Output::Pq { out->insertInto = prepare(ou.str().c_str(), out->columns.size()); } + beforeEvent(e); std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size()); Row rp {e->event.rows, table_map->event.table_map}; copyAll(rp, std::back_inserter(updateValues)); out->insertInto->execute(updateValues); verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated."); + afterEvent(e); } } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 970c6d5..9547b90 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -66,6 +66,9 @@ namespace MyGrate::Output::Pq { UpdateDatabase(PqConn &&, uint64_t source); UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg); + void beforeEvent(const MariaDB_Event_Ptr & e); + void afterEvent(const MariaDB_Event_Ptr & e); + static void verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr &); static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&); static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&); |