From 8e1977685713c68ff483fe02e010deaa685b453a Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Thu, 22 Jul 2021 23:09:36 +0100 Subject: First cut replicating row updates --- lib/output/pq/updateDatabase.cpp | 59 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) (limited to 'lib/output/pq/updateDatabase.cpp') diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 6a0f419..9659cc0 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -33,7 +34,8 @@ namespace MyGrate::Output::Pq { } UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s, RecordSetPtr cfg) : - PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), database(cfg->at(0, 1)) + PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), + database(cfg->at(0, 1)), selected {tables.end()}, table_map {nullptr, nullptr} { auto trecs = output::pq::sql::selectTables::execute(this, source); auto crecs = output::pq::sql::selectColumns::execute(this, source); @@ -227,4 +229,59 @@ namespace MyGrate::Output::Pq { fclose(out); } + + void + UpdateDatabase::tableMap(MariaDB_Event_Ptr e) + { + if (*e->event.table_map.database == database) { + selected = tables.find(*e->event.table_map.table); + table_map = std::move(e); + } + else { + selected = tables.end(); + table_map.reset(); + } + } + + void + UpdateDatabase::updateRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->update) { + std::stringstream ou; + std::size_t ordinal {0}, kordinal {out->columns.size()}; + + scprintf<"UPDATE %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + scprintf<"%? %? = $%?">(ou, !ordinal ? " SET " : ", ", col->name, ordinal + 1); + ordinal++; + } + for (const auto & col : out->columns) { + if (col->is_pk) { + scprintf<"%? %? = $%?">( + ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1); + kordinal++; + } + } + + out->update = prepare(ou.str().c_str(), kordinal); + } + std::vector updateValues; + updateValues.reserve( + out->columns.size() + std::count_if(out->columns.begin(), out->columns.end(), [](auto && c) { + return c->is_pk; + })); + RowPair rp {e->event.rows, table_map->event.table_map}; + std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues)); + std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues), + [c = out->columns.begin()](auto &&) mutable { + return (c++)->get()->is_pk; + }); + out->update->execute(updateValues); + verify(out->update->rows() == 1, "Wrong number of rows updated."); + } + } } -- cgit v1.2.3