diff options
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 32 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 |
2 files changed, 34 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 65caf86..ebc3c43 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -319,4 +319,36 @@ namespace MyGrate::Output::Pq { verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted."); } } + + void + UpdateDatabase::insertRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify<std::runtime_error>( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->insertInto) { + std::stringstream ou; + std::size_t ordinal {0}, kordinal {out->columns.size()}; + + scprintf<"INSERT INTO %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + scprintf<"%? %?">(ou, !ordinal ? "(" : ", ", col->name); + } + ou << ") VALUES"; + for (const auto & col : out->columns) { + scprintf<"%? $%?">(ou, !ordinal ? "(" : ", ", col->name); + } + ou << ")"; + + out->insertInto = prepare(ou.str().c_str(), kordinal); + } + std::vector<DbValue> updateValues; + updateValues.reserve(out->columns.size()); + Row rp {e->event.rows, table_map->event.table_map}; + std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues)); + out->insertInto->execute(updateValues); + verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated."); + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 7641ada..89d9916 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -27,7 +27,7 @@ namespace MyGrate::Output::Pq { std::vector<ColumnDefPtr> columns; std::size_t keys; - DbPrepStmtPtr update, deleteFrom; + DbPrepStmtPtr update, deleteFrom, insertInto; }; using TableDefPtr = std::unique_ptr<TableOutput>; @@ -54,6 +54,7 @@ namespace MyGrate::Output::Pq { // Replication events void updateRow(MariaDB_Event_Ptr) override; void deleteRow(MariaDB_Event_Ptr) override; + void insertRow(MariaDB_Event_Ptr) override; void tableMap(MariaDB_Event_Ptr) override; const uint64_t source; |