summaryrefslogtreecommitdiff
path: root/lib/output/pq/updateDatabase.cpp
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2021-07-22 23:09:36 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2021-07-22 23:09:36 +0100
commit8e1977685713c68ff483fe02e010deaa685b453a (patch)
tree2727c320a9ea1b28502633a7009a3180199ab691 /lib/output/pq/updateDatabase.cpp
parentCreate test table before setting up repl (diff)
downloadmygrate-8e1977685713c68ff483fe02e010deaa685b453a.tar.bz2
mygrate-8e1977685713c68ff483fe02e010deaa685b453a.tar.xz
mygrate-8e1977685713c68ff483fe02e010deaa685b453a.zip
First cut replicating row updates
Diffstat (limited to 'lib/output/pq/updateDatabase.cpp')
-rw-r--r--lib/output/pq/updateDatabase.cpp59
1 files changed, 58 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 6a0f419..9659cc0 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -19,6 +19,7 @@
#include <output/pq/sql/selectSource.h>
#include <output/pq/sql/selectSourceSchema.h>
#include <output/pq/sql/selectTables.h>
+#include <row.h>
#include <stdexcept>
#include <streamSupport.h>
@@ -33,7 +34,8 @@ namespace MyGrate::Output::Pq {
}
UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s, RecordSetPtr cfg) :
- PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), database(cfg->at(0, 1))
+ PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)),
+ database(cfg->at(0, 1)), selected {tables.end()}, table_map {nullptr, nullptr}
{
auto trecs = output::pq::sql::selectTables::execute(this, source);
auto crecs = output::pq::sql::selectColumns::execute(this, source);
@@ -227,4 +229,59 @@ namespace MyGrate::Output::Pq {
fclose(out);
}
+
+ void
+ UpdateDatabase::tableMap(MariaDB_Event_Ptr e)
+ {
+ if (*e->event.table_map.database == database) {
+ selected = tables.find(*e->event.table_map.table);
+ table_map = std::move(e);
+ }
+ else {
+ selected = tables.end();
+ table_map.reset();
+ }
+ }
+
+ void
+ UpdateDatabase::updateRow(MariaDB_Event_Ptr e)
+ {
+ if (selected != tables.end()) {
+ auto & out = selected->second;
+ verify<std::runtime_error>(
+ e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ if (!out->update) {
+ std::stringstream ou;
+ std::size_t ordinal {0}, kordinal {out->columns.size()};
+
+ scprintf<"UPDATE %?.%? ">(ou, schema, selected->first);
+ for (const auto & col : out->columns) {
+ scprintf<"%? %? = $%?">(ou, !ordinal ? " SET " : ", ", col->name, ordinal + 1);
+ ordinal++;
+ }
+ for (const auto & col : out->columns) {
+ if (col->is_pk) {
+ scprintf<"%? %? = $%?">(
+ ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1);
+ kordinal++;
+ }
+ }
+
+ out->update = prepare(ou.str().c_str(), kordinal);
+ }
+ std::vector<DbValue> updateValues;
+ updateValues.reserve(
+ out->columns.size() + std::count_if(out->columns.begin(), out->columns.end(), [](auto && c) {
+ return c->is_pk;
+ }));
+ RowPair rp {e->event.rows, table_map->event.table_map};
+ std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues));
+ std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues),
+ [c = out->columns.begin()](auto &&) mutable {
+ return (c++)->get()->is_pk;
+ });
+ out->update->execute(updateValues);
+ verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");
+ }
+ }
}