summaryrefslogtreecommitdiff
path: root/lib/output
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output')
-rw-r--r--lib/output/pq/updateDatabase.cpp32
-rw-r--r--lib/output/pq/updateDatabase.h3
2 files changed, 34 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 65caf86..ebc3c43 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -319,4 +319,36 @@ namespace MyGrate::Output::Pq {
verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted.");
}
}
+
+ void
+ UpdateDatabase::insertRow(MariaDB_Event_Ptr e)
+ {
+ if (selected != tables.end()) {
+ auto & out = selected->second;
+ verify<std::runtime_error>(
+ e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ if (!out->insertInto) {
+ std::stringstream ou;
+ std::size_t ordinal {0}, kordinal {out->columns.size()};
+
+ scprintf<"INSERT INTO %?.%? ">(ou, schema, selected->first);
+ for (const auto & col : out->columns) {
+ scprintf<"%? %?">(ou, !ordinal ? "(" : ", ", col->name);
+ }
+ ou << ") VALUES";
+ for (const auto & col : out->columns) {
+ scprintf<"%? $%?">(ou, !ordinal ? "(" : ", ", col->name);
+ }
+ ou << ")";
+
+ out->insertInto = prepare(ou.str().c_str(), kordinal);
+ }
+ std::vector<DbValue> updateValues;
+ updateValues.reserve(out->columns.size());
+ Row rp {e->event.rows, table_map->event.table_map};
+ std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues));
+ out->insertInto->execute(updateValues);
+ verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated.");
+ }
+ }
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 7641ada..89d9916 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq {
std::vector<ColumnDefPtr> columns;
std::size_t keys;
- DbPrepStmtPtr update, deleteFrom;
+ DbPrepStmtPtr update, deleteFrom, insertInto;
};
using TableDefPtr = std::unique_ptr<TableOutput>;
@@ -54,6 +54,7 @@ namespace MyGrate::Output::Pq {
// Replication events
void updateRow(MariaDB_Event_Ptr) override;
void deleteRow(MariaDB_Event_Ptr) override;
+ void insertRow(MariaDB_Event_Ptr) override;
void tableMap(MariaDB_Event_Ptr) override;
const uint64_t source;