summaryrefslogtreecommitdiff
path: root/lib/output
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output')
-rw-r--r--lib/output/pq/updateDatabase.cpp45
-rw-r--r--lib/output/pq/updateDatabase.h5
2 files changed, 34 insertions, 16 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 4830fd5..b6809c2 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -248,12 +248,33 @@ namespace MyGrate::Output::Pq {
}
void
+ UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out)
+ {
+ verify<std::runtime_error>(
+ e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ }
+
+ void
+ UpdateDatabase::copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> && out)
+ {
+ std::copy(r.begin(), r.end(), out);
+ }
+
+ void
+ UpdateDatabase::copyKeys(
+ const Row & r, const TableDefPtr & td, std::back_insert_iterator<std::vector<DbValue>> && out)
+ {
+ std::copy_if(r.begin(), r.end(), out, [c = td->columns.begin()](auto &&) mutable {
+ return (c++)->get()->is_pk;
+ });
+ }
+
+ void
UpdateDatabase::updateRow(MariaDB_Event_Ptr e)
{
if (selected != tables.end()) {
auto & out = selected->second;
- verify<std::runtime_error>(
- e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ verifyRow(e, out);
if (!out->update) {
std::stringstream ou;
std::size_t ordinal {0}, kordinal {out->columns.size()};
@@ -276,11 +297,8 @@ namespace MyGrate::Output::Pq {
std::vector<DbValue> updateValues;
updateValues.reserve(out->columns.size() + out->keys);
RowPair rp {e->event.rows, table_map->event.table_map};
- std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues));
- std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues),
- [c = out->columns.begin()](auto &&) mutable {
- return (c++)->get()->is_pk;
- });
+ copyAll(rp.second, std::back_inserter(updateValues));
+ copyKeys(rp.first, out, std::back_inserter(updateValues));
out->update->execute(updateValues);
verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");
}
@@ -291,8 +309,7 @@ namespace MyGrate::Output::Pq {
{
if (selected != tables.end()) {
auto & out = selected->second;
- verify<std::runtime_error>(
- e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ verifyRow(e, out);
if (!out->deleteFrom) {
std::stringstream ou;
std::size_t kordinal {0};
@@ -310,10 +327,7 @@ namespace MyGrate::Output::Pq {
std::vector<DbValue> updateValues;
updateValues.reserve(out->keys);
Row rp {e->event.rows, table_map->event.table_map};
- std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues),
- [c = out->columns.begin()](auto &&) mutable {
- return (c++)->get()->is_pk;
- });
+ copyKeys(rp, out, std::back_inserter(updateValues));
out->deleteFrom->execute(updateValues);
verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted.");
}
@@ -324,8 +338,7 @@ namespace MyGrate::Output::Pq {
{
if (selected != tables.end()) {
auto & out = selected->second;
- verify<std::runtime_error>(
- e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ verifyRow(e, out);
if (!out->insertInto) {
std::stringstream ou;
std::size_t ordinal {0}, vordinal {0};
@@ -346,7 +359,7 @@ namespace MyGrate::Output::Pq {
std::vector<DbValue> updateValues;
updateValues.reserve(out->columns.size());
Row rp {e->event.rows, table_map->event.table_map};
- std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues));
+ copyAll(rp, std::back_inserter(updateValues));
out->insertInto->execute(updateValues);
verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated.");
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 89d9916..970c6d5 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -6,6 +6,7 @@
#include <cstdint>
#include <eventHandlerBase.h>
#include <eventSourceBase.h>
+#include <row.h>
namespace MyGrate::Input {
class MySQLConn;
@@ -65,6 +66,10 @@ namespace MyGrate::Output::Pq {
UpdateDatabase(PqConn &&, uint64_t source);
UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg);
+ static void verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr &);
+ static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&);
+ static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&);
+
using Tables = std::map<std::string, TableDefPtr, std::less<>>;
Tables tables;
Tables::const_iterator selected;