From 5f0e260cfbdd43556c5f867861216cba3692d8e4 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Fri, 23 Jul 2021 01:13:47 +0100 Subject: First cut replicating row inserts --- lib/output/pq/updateDatabase.cpp | 32 ++++++++++++++++++++++++++++++++ lib/output/pq/updateDatabase.h | 3 ++- 2 files changed, 34 insertions(+), 1 deletion(-) (limited to 'lib') diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 65caf86..ebc3c43 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -319,4 +319,36 @@ namespace MyGrate::Output::Pq { verify(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); } } + + void + UpdateDatabase::insertRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->insertInto) { + std::stringstream ou; + std::size_t ordinal {0}, kordinal {out->columns.size()}; + + scprintf<"INSERT INTO %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + scprintf<"%? %?">(ou, !ordinal ? "(" : ", ", col->name); + } + ou << ") VALUES"; + for (const auto & col : out->columns) { + scprintf<"%? $%?">(ou, !ordinal ? "(" : ", ", col->name); + } + ou << ")"; + + out->insertInto = prepare(ou.str().c_str(), kordinal); + } + std::vector updateValues; + updateValues.reserve(out->columns.size()); + Row rp {e->event.rows, table_map->event.table_map}; + std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues)); + out->insertInto->execute(updateValues); + verify(out->insertInto->rows() == 1, "Wrong number of rows updated."); + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 7641ada..89d9916 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq { std::vector columns; std::size_t keys; - DbPrepStmtPtr update, deleteFrom; + DbPrepStmtPtr update, deleteFrom, insertInto; }; using TableDefPtr = std::unique_ptr; @@ -54,6 +54,7 @@ namespace MyGrate::Output::Pq { // Replication events void updateRow(MariaDB_Event_Ptr) override; void deleteRow(MariaDB_Event_Ptr) override; + void insertRow(MariaDB_Event_Ptr) override; void tableMap(MariaDB_Event_Ptr) override; const uint64_t source; -- cgit v1.2.3