summaryrefslogtreecommitdiff
path: root/lib/output/pq
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output/pq')
-rw-r--r--lib/output/pq/updateDatabase.cpp34
-rw-r--r--lib/output/pq/updateDatabase.h3
2 files changed, 36 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 7036f67..65caf86 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -285,4 +285,38 @@ namespace MyGrate::Output::Pq {
verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");
}
}
+
+ void
+ UpdateDatabase::deleteRow(MariaDB_Event_Ptr e)
+ {
+ if (selected != tables.end()) {
+ auto & out = selected->second;
+ verify<std::runtime_error>(
+ e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ if (!out->deleteFrom) {
+ std::stringstream ou;
+ std::size_t kordinal {0};
+
+ scprintf<"DELETE FROM %?.%? ">(ou, schema, selected->first);
+ for (const auto & col : out->columns) {
+ if (col->is_pk) {
+ scprintf<"%? %? = $%?">(
+ ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1);
+ kordinal++;
+ }
+ }
+
+ out->deleteFrom = prepare(ou.str().c_str(), kordinal);
+ }
+ std::vector<DbValue> updateValues;
+ updateValues.reserve(out->keys);
+ Row rp {e->event.rows, table_map->event.table_map};
+ std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues),
+ [c = out->columns.begin()](auto &&) mutable {
+ return (c++)->get()->is_pk;
+ });
+ out->deleteFrom->execute(updateValues);
+ verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted.");
+ }
+ }
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index e56852b..7641ada 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq {
std::vector<ColumnDefPtr> columns;
std::size_t keys;
- DbPrepStmtPtr update;
+ DbPrepStmtPtr update, deleteFrom;
};
using TableDefPtr = std::unique_ptr<TableOutput>;
@@ -53,6 +53,7 @@ namespace MyGrate::Output::Pq {
// Replication events
void updateRow(MariaDB_Event_Ptr) override;
+ void deleteRow(MariaDB_Event_Ptr) override;
void tableMap(MariaDB_Event_Ptr) override;
const uint64_t source;