From 3c1d15e3bfc2788cd7f6c9f826eafc1a8abc77c1 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 23 Jul 2021 00:52:03 +0100 Subject: First cut replicating row deletes --- lib/output/pq/updateDatabase.cpp | 34 ++++++++++++++++++++++++++++++++++ lib/output/pq/updateDatabase.h | 3 ++- 2 files changed, 36 insertions(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 7036f67..65caf86 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -285,4 +285,38 @@ namespace MyGrate::Output::Pq { verify(out->update->rows() == 1, "Wrong number of rows updated."); } } + + void + UpdateDatabase::deleteRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->deleteFrom) { + std::stringstream ou; + std::size_t kordinal {0}; + + scprintf<"DELETE FROM %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + if (col->is_pk) { + scprintf<"%? %? = $%?">( + ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1); + kordinal++; + } + } + + out->deleteFrom = prepare(ou.str().c_str(), kordinal); + } + std::vector updateValues; + updateValues.reserve(out->keys); + Row rp {e->event.rows, table_map->event.table_map}; + std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues), + [c = out->columns.begin()](auto &&) mutable { + return (c++)->get()->is_pk; + }); + out->deleteFrom->execute(updateValues); + verify(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index e56852b..7641ada 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq { std::vector columns; std::size_t keys; - DbPrepStmtPtr update; + DbPrepStmtPtr update, deleteFrom; }; using TableDefPtr = std::unique_ptr; @@ -53,6 +53,7 @@ namespace MyGrate::Output::Pq { // Replication events void updateRow(MariaDB_Event_Ptr) override; + void deleteRow(MariaDB_Event_Ptr) override; void tableMap(MariaDB_Event_Ptr) override; const uint64_t source; -- cgit v1.2.3