summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/output/pq/updateDatabase.cpp59
-rw-r--r--lib/output/pq/updateDatabase.h16
-rw-r--r--test/test-e2e.cpp12
3 files changed, 85 insertions, 2 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 6a0f419..9659cc0 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -19,6 +19,7 @@
#include <output/pq/sql/selectSource.h>
#include <output/pq/sql/selectSourceSchema.h>
#include <output/pq/sql/selectTables.h>
+#include <row.h>
#include <stdexcept>
#include <streamSupport.h>
@@ -33,7 +34,8 @@ namespace MyGrate::Output::Pq {
}
UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s, RecordSetPtr cfg) :
- PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), database(cfg->at(0, 1))
+ PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)),
+ database(cfg->at(0, 1)), selected {tables.end()}, table_map {nullptr, nullptr}
{
auto trecs = output::pq::sql::selectTables::execute(this, source);
auto crecs = output::pq::sql::selectColumns::execute(this, source);
@@ -227,4 +229,59 @@ namespace MyGrate::Output::Pq {
fclose(out);
}
+
+ void
+ UpdateDatabase::tableMap(MariaDB_Event_Ptr e)
+ {
+ if (*e->event.table_map.database == database) {
+ selected = tables.find(*e->event.table_map.table);
+ table_map = std::move(e);
+ }
+ else {
+ selected = tables.end();
+ table_map.reset();
+ }
+ }
+
+ void
+ UpdateDatabase::updateRow(MariaDB_Event_Ptr e)
+ {
+ if (selected != tables.end()) {
+ auto & out = selected->second;
+ verify<std::runtime_error>(
+ e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data");
+ if (!out->update) {
+ std::stringstream ou;
+ std::size_t ordinal {0}, kordinal {out->columns.size()};
+
+ scprintf<"UPDATE %?.%? ">(ou, schema, selected->first);
+ for (const auto & col : out->columns) {
+ scprintf<"%? %? = $%?">(ou, !ordinal ? " SET " : ", ", col->name, ordinal + 1);
+ ordinal++;
+ }
+ for (const auto & col : out->columns) {
+ if (col->is_pk) {
+ scprintf<"%? %? = $%?">(
+ ou, kordinal == out->columns.size() ? " WHERE " : " AND ", col->name, kordinal + 1);
+ kordinal++;
+ }
+ }
+
+ out->update = prepare(ou.str().c_str(), kordinal);
+ }
+ std::vector<DbValue> updateValues;
+ updateValues.reserve(
+ out->columns.size() + std::count_if(out->columns.begin(), out->columns.end(), [](auto && c) {
+ return c->is_pk;
+ }));
+ RowPair rp {e->event.rows, table_map->event.table_map};
+ std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues));
+ std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues),
+ [c = out->columns.begin()](auto &&) mutable {
+ return (c++)->get()->is_pk;
+ });
+ out->update->execute(updateValues);
+ verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");
+ }
+ }
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 79df9a9..aa50e90 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -2,6 +2,7 @@
#define MYGRATE_OUTPUT_PQ_UPDATEDATABASE_H
#include "pqConn.h"
+#include "pqStmt.h"
#include <cstdint>
#include <eventHandlerBase.h>
#include <eventSourceBase.h>
@@ -25,6 +26,7 @@ namespace MyGrate::Output::Pq {
TableOutput(const RecordSet &, std::string_view name);
std::vector<ColumnDefPtr> columns;
+ DbPrepStmtPtr update;
};
using TableDefPtr = std::unique_ptr<TableOutput>;
@@ -32,6 +34,10 @@ namespace MyGrate::Output::Pq {
using std::runtime_error::runtime_error;
};
+ class ReplicationError : public std::runtime_error {
+ using std::runtime_error::runtime_error;
+ };
+
class UpdateDatabase : public PqConn, public EventHandlerBase {
public:
UpdateDatabase(const char * const str, uint64_t source);
@@ -44,6 +50,10 @@ namespace MyGrate::Output::Pq {
void addTable(Input::MySQLConn *, const char * tableName);
void copyTableContent(Input::MySQLConn *, const char * tableName);
+ // Replication events
+ void updateRow(MariaDB_Event_Ptr) override;
+ void tableMap(MariaDB_Event_Ptr) override;
+
const uint64_t source;
const std::string schema;
const std::string database;
@@ -51,7 +61,11 @@ namespace MyGrate::Output::Pq {
private:
UpdateDatabase(PqConn &&, uint64_t source);
UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg);
- std::map<std::string, TableDefPtr, std::less<>> tables;
+
+ using Tables = std::map<std::string, TableDefPtr, std::less<>>;
+ Tables tables;
+ Tables::const_iterator selected;
+ MariaDB_Event_Ptr table_map;
};
}
diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp
index 9394e13..60dca86 100644
--- a/test/test-e2e.cpp
+++ b/test/test-e2e.cpp
@@ -4,10 +4,12 @@
#include "testdb-mysql.h"
#include "testdb-postgresql.h"
+#include <input/replStream.h>
#include <output/pq/updateDatabase.h>
#include <sql/createTestTable.h>
#include <sql/fillTestTable.h>
#include <sql/selectTestTable.h>
+#include <thread>
BOOST_AUTO_TEST_CASE(e2e)
{
@@ -32,4 +34,14 @@ BOOST_AUTO_TEST_CASE(e2e)
out.copyTableContent(&mym, "session");
BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1);
+
+ std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)};
+
+ auto upd = mym.prepare("UPDATE session SET session_id = ? WHERE id = ?", 2);
+ upd->execute(std::array<MyGrate::DbValue, 2> {"food", 1});
+
+ sleep(1);
+
+ src->stopEvents();
+ repl.join();
}