diff options
Diffstat (limited to 'lib/output/pq')
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 45 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 5 |
2 files changed, 34 insertions, 16 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 4830fd5..b6809c2 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -248,12 +248,33 @@ namespace MyGrate::Output::Pq { } void + UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out) + { + verify<std::runtime_error>( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + } + + void + UpdateDatabase::copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> && out) + { + std::copy(r.begin(), r.end(), out); + } + + void + UpdateDatabase::copyKeys( + const Row & r, const TableDefPtr & td, std::back_insert_iterator<std::vector<DbValue>> && out) + { + std::copy_if(r.begin(), r.end(), out, [c = td->columns.begin()](auto &&) mutable { + return (c++)->get()->is_pk; + }); + } + + void UpdateDatabase::updateRow(MariaDB_Event_Ptr e) { if (selected != tables.end()) { auto & out = selected->second; - verify<std::runtime_error>( - e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + verifyRow(e, out); if (!out->update) { std::stringstream ou; std::size_t ordinal {0}, kordinal {out->columns.size()}; @@ -276,11 +297,8 @@ namespace MyGrate::Output::Pq { std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size() + out->keys); RowPair rp {e->event.rows, table_map->event.table_map}; - std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues)); - std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues), - [c = out->columns.begin()](auto &&) mutable { - return (c++)->get()->is_pk; - }); + copyAll(rp.second, std::back_inserter(updateValues)); + copyKeys(rp.first, out, std::back_inserter(updateValues)); out->update->execute(updateValues); verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated."); } @@ -291,8 +309,7 @@ namespace MyGrate::Output::Pq { { if (selected != tables.end()) { auto & out = selected->second; - verify<std::runtime_error>( - e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + verifyRow(e, out); if (!out->deleteFrom) { std::stringstream ou; std::size_t kordinal {0}; @@ -310,10 +327,7 @@ namespace MyGrate::Output::Pq { std::vector<DbValue> updateValues; updateValues.reserve(out->keys); Row rp {e->event.rows, table_map->event.table_map}; - std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues), - [c = out->columns.begin()](auto &&) mutable { - return (c++)->get()->is_pk; - }); + copyKeys(rp, out, std::back_inserter(updateValues)); out->deleteFrom->execute(updateValues); verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); } @@ -324,8 +338,7 @@ namespace MyGrate::Output::Pq { { if (selected != tables.end()) { auto & out = selected->second; - verify<std::runtime_error>( - e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + verifyRow(e, out); if (!out->insertInto) { std::stringstream ou; std::size_t ordinal {0}, vordinal {0}; @@ -346,7 +359,7 @@ namespace MyGrate::Output::Pq { std::vector<DbValue> updateValues; updateValues.reserve(out->columns.size()); Row rp {e->event.rows, table_map->event.table_map}; - std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues)); + copyAll(rp, std::back_inserter(updateValues)); out->insertInto->execute(updateValues); verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated."); } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 89d9916..970c6d5 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -6,6 +6,7 @@ #include <cstdint> #include <eventHandlerBase.h> #include <eventSourceBase.h> +#include <row.h> namespace MyGrate::Input { class MySQLConn; @@ -65,6 +66,10 @@ namespace MyGrate::Output::Pq { UpdateDatabase(PqConn &&, uint64_t source); UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg); + static void verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr &); + static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&); + static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&); + using Tables = std::map<std::string, TableDefPtr, std::less<>>; Tables tables; Tables::const_iterator selected; |