diff options
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 59 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 16 | ||||
-rw-r--r-- | test/test-e2e.cpp | 12 |
3 files changed, 85 insertions, 2 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 6a0f419..9659cc0 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -19,6 +19,7 @@ #include <output/pq/sql/selectSource.h> #include <output/pq/sql/selectSourceSchema.h> #include <output/pq/sql/selectTables.h> +#include <row.h> #include <stdexcept> #include <streamSupport.h> @@ -33,7 +34,8 @@ namespace MyGrate::Output::Pq { } UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s, RecordSetPtr cfg) : - PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), database(cfg->at(0, 1)) + PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), + database(cfg->at(0, 1)), selected {tables.end()}, table_map {nullptr, nullptr} { auto trecs = output::pq::sql::selectTables::execute(this, source); auto crecs = output::pq::sql::selectColumns::execute(this, source); @@ -227,4 +229,59 @@ namespace MyGrate::Output::Pq { fclose(out); } + + void + UpdateDatabase::tableMap(MariaDB_Event_Ptr e) + { + if (*e->event.table_map.database == database) { + selected = tables.find(*e->event.table_map.table); + table_map = std::move(e); + } + else { + selected = tables.end(); + table_map.reset(); + } + } + + void + UpdateDatabase::updateRow(MariaDB_Event_Ptr e) + { + if (selected != tables.end()) { + auto & out = selected->second; + verify<std::runtime_error>( + e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); + if (!out->update) { + std::stringstream ou; + std::size_t ordinal {0}, kordinal {out->columns.size()}; + + scprintf<"UPDATE %?.%? ">(ou, schema, selected->first); + for (const auto & col : out->columns) { + scprintf<"%? %? = $%?">(ou, !ordinal ? " SET " : ", ", col->name, ordinal + 1); + ordinal++; + } + for (const auto & col : out->columns) { + if (col->is_pk) { + scprintf<"%? %? = $%?">( + ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1); + kordinal++; + } + } + + out->update = prepare(ou.str().c_str(), kordinal); + } + std::vector<DbValue> updateValues; + updateValues.reserve( + out->columns.size() + std::count_if(out->columns.begin(), out->columns.end(), [](auto && c) { + return c->is_pk; + })); + RowPair rp {e->event.rows, table_map->event.table_map}; + std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues)); + std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues), + [c = out->columns.begin()](auto &&) mutable { + return (c++)->get()->is_pk; + }); + out->update->execute(updateValues); + verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated."); + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 79df9a9..aa50e90 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -2,6 +2,7 @@ #define MYGRATE_OUTPUT_PQ_UPDATEDATABASE_H #include "pqConn.h" +#include "pqStmt.h" #include <cstdint> #include <eventHandlerBase.h> #include <eventSourceBase.h> @@ -25,6 +26,7 @@ namespace MyGrate::Output::Pq { TableOutput(const RecordSet &, std::string_view name); std::vector<ColumnDefPtr> columns; + DbPrepStmtPtr update; }; using TableDefPtr = std::unique_ptr<TableOutput>; @@ -32,6 +34,10 @@ namespace MyGrate::Output::Pq { using std::runtime_error::runtime_error; }; + class ReplicationError : public std::runtime_error { + using std::runtime_error::runtime_error; + }; + class UpdateDatabase : public PqConn, public EventHandlerBase { public: UpdateDatabase(const char * const str, uint64_t source); @@ -44,6 +50,10 @@ namespace MyGrate::Output::Pq { void addTable(Input::MySQLConn *, const char * tableName); void copyTableContent(Input::MySQLConn *, const char * tableName); + // Replication events + void updateRow(MariaDB_Event_Ptr) override; + void tableMap(MariaDB_Event_Ptr) override; + const uint64_t source; const std::string schema; const std::string database; @@ -51,7 +61,11 @@ namespace MyGrate::Output::Pq { private: UpdateDatabase(PqConn &&, uint64_t source); UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg); - std::map<std::string, TableDefPtr, std::less<>> tables; + + using Tables = std::map<std::string, TableDefPtr, std::less<>>; + Tables tables; + Tables::const_iterator selected; + MariaDB_Event_Ptr table_map; }; } diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp index 9394e13..60dca86 100644 --- a/test/test-e2e.cpp +++ b/test/test-e2e.cpp @@ -4,10 +4,12 @@ #include "testdb-mysql.h" #include "testdb-postgresql.h" +#include <input/replStream.h> #include <output/pq/updateDatabase.h> #include <sql/createTestTable.h> #include <sql/fillTestTable.h> #include <sql/selectTestTable.h> +#include <thread> BOOST_AUTO_TEST_CASE(e2e) { @@ -32,4 +34,14 @@ BOOST_AUTO_TEST_CASE(e2e) out.copyTableContent(&mym, "session"); BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1); + + std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)}; + + auto upd = mym.prepare("UPDATE session SET session_id = ? WHERE id = ?", 2); + upd->execute(std::array<MyGrate::DbValue, 2> {"food", 1}); + + sleep(1); + + src->stopEvents(); + repl.join(); } |