diff options
Diffstat (limited to 'lib/output/pq')
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 34 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 |
2 files changed, 36 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 7036f67..65caf86 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -285,4 +285,38 @@ namespace MyGrate::Output::Pq { verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated."); } } + + void + UpdateDatabase::deleteRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify<std::runtime_error>( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->deleteFrom) { + std::stringstream ou; + std::size_t kordinal {0}; + + scprintf<"DELETE FROM %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + if (col->is_pk) { + scprintf<"%? %? = $%?">( + ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1); + kordinal++; + } + } + + out->deleteFrom = prepare(ou.str().c_str(), kordinal); + } + std::vector<DbValue> updateValues; + updateValues.reserve(out->keys); + Row rp {e->event.rows, table_map->event.table_map}; + std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues), + [c = out->columns.begin()](auto &&) mutable { + return (c++)->get()->is_pk; + }); + out->deleteFrom->execute(updateValues); + verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index e56852b..7641ada 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq { std::vector<ColumnDefPtr> columns; std::size_t keys; - DbPrepStmtPtr update; + DbPrepStmtPtr update, deleteFrom; }; using TableDefPtr = std::unique_ptr<TableOutput>; @@ -53,6 +53,7 @@ namespace MyGrate::Output::Pq { // Replication events void updateRow(MariaDB_Event_Ptr) override; + void deleteRow(MariaDB_Event_Ptr) override; void tableMap(MariaDB_Event_Ptr) override; const uint64_t source; |