summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2021-08-05 23:30:09 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2021-08-05 23:30:09 +0100
commitfdc6de263a58480519d683e67bb3dfa262d9b230 (patch)
tree1b2a5dbe54a02bacd008e38f2ef965ce40c9c8da
parentAdd base handlers for gtid/xid events (diff)
downloadmygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.tar.bz2
mygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.tar.xz
mygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.zip
Handle gtid and xid transaction events
-rw-r--r--lib/output/pq/updateDatabase.cpp30
-rw-r--r--lib/output/pq/updateDatabase.h3
-rw-r--r--test/sql/simpleCreate.sql5
-rw-r--r--test/sql/simpleDelete.sql2
-rw-r--r--test/sql/simpleDeleteSome.sql2
-rw-r--r--test/sql/simpleInsert.sql1
-rw-r--r--test/sql/simpleSelectAll.sql3
-rw-r--r--test/sql/simpleUpdate.sql1
-rw-r--r--test/sql/simpleUpdateAll.sql1
-rw-r--r--test/test-e2e.cpp51
10 files changed, 96 insertions, 3 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 232e8f9..ea8797b 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -27,6 +27,8 @@
#include <streamSupport.h>
namespace MyGrate::Output::Pq {
+ constexpr uint8_t STANDALONE {1};
+
#ifndef __cpp_aggregate_paren_init
ColumnDef::ColumnDef(std::string n, std::size_t o, bool p) : name {std::move(n)}, ordinal(o), is_pk(p) { }
#endif
@@ -266,14 +268,18 @@ namespace MyGrate::Output::Pq {
void
UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &)
{
- beginTx();
+ if (!intx) {
+ beginTx();
+ }
}
void
UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e)
{
- output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source);
- commitTx();
+ if (!intx) {
+ output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source);
+ commitTx();
+ }
}
void
@@ -401,4 +407,22 @@ namespace MyGrate::Output::Pq {
afterEvent(e);
}
}
+
+ void
+ UpdateDatabase::gtid(MariaDB_Event_Ptr e)
+ {
+ beforeEvent(e);
+ if (!(e->event.gtid.flags & STANDALONE)) {
+ intx = true;
+ }
+ afterEvent(e);
+ }
+
+ void
+ UpdateDatabase::xid(MariaDB_Event_Ptr e)
+ {
+ beforeEvent(e);
+ intx = false;
+ afterEvent(e);
+ }
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index d2b4872..716d7f6 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -59,6 +59,8 @@ namespace MyGrate::Output::Pq {
void insertRow(MariaDB_Event_Ptr) override;
void tableMap(MariaDB_Event_Ptr) override;
void rotate(MariaDB_Event_Ptr) override;
+ void gtid(MariaDB_Event_Ptr) override;
+ void xid(MariaDB_Event_Ptr) override;
const uint64_t source;
const std::string schema;
@@ -81,6 +83,7 @@ namespace MyGrate::Output::Pq {
Tables tables;
Tables::const_iterator selected;
MariaDB_Event_Ptr table_map;
+ bool intx {false};
};
}
diff --git a/test/sql/simpleCreate.sql b/test/sql/simpleCreate.sql
new file mode 100644
index 0000000..c2a89e9
--- /dev/null
+++ b/test/sql/simpleCreate.sql
@@ -0,0 +1,5 @@
+CREATE TABLE test(
+ id int auto_increment,
+ val varchar(20),
+ primary key(id)
+)
diff --git a/test/sql/simpleDelete.sql b/test/sql/simpleDelete.sql
new file mode 100644
index 0000000..1c1bd85
--- /dev/null
+++ b/test/sql/simpleDelete.sql
@@ -0,0 +1,2 @@
+DELETE FROM test WHERE id = ?
+
diff --git a/test/sql/simpleDeleteSome.sql b/test/sql/simpleDeleteSome.sql
new file mode 100644
index 0000000..ed69fa8
--- /dev/null
+++ b/test/sql/simpleDeleteSome.sql
@@ -0,0 +1,2 @@
+DELETE FROM test WHERE id < ?
+
diff --git a/test/sql/simpleInsert.sql b/test/sql/simpleInsert.sql
new file mode 100644
index 0000000..f695332
--- /dev/null
+++ b/test/sql/simpleInsert.sql
@@ -0,0 +1 @@
+INSERT INTO test(val) VALUES(?)
diff --git a/test/sql/simpleSelectAll.sql b/test/sql/simpleSelectAll.sql
new file mode 100644
index 0000000..9fe07bc
--- /dev/null
+++ b/test/sql/simpleSelectAll.sql
@@ -0,0 +1,3 @@
+SELECT id, val
+FROM testout.test
+ORDER BY id
diff --git a/test/sql/simpleUpdate.sql b/test/sql/simpleUpdate.sql
new file mode 100644
index 0000000..700f127
--- /dev/null
+++ b/test/sql/simpleUpdate.sql
@@ -0,0 +1 @@
+UPDATE test SET val = ? WHERE id = ?
diff --git a/test/sql/simpleUpdateAll.sql b/test/sql/simpleUpdateAll.sql
new file mode 100644
index 0000000..1ac2f13
--- /dev/null
+++ b/test/sql/simpleUpdateAll.sql
@@ -0,0 +1 @@
+UPDATE test SET val = ?
diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp
index 37fa924..31f3b95 100644
--- a/test/test-e2e.cpp
+++ b/test/test-e2e.cpp
@@ -5,11 +5,19 @@
#include "semaphore.h"
#include "testdb-mysql.h"
#include "testdb-postgresql.h"
+#include <compileTimeFormatter.h>
+#include <dbConn.h>
#include <input/replStream.h>
#include <output/pq/updateDatabase.h>
#include <sql/createTestTable.h>
#include <sql/fillTestTable.h>
#include <sql/selectTestTable.h>
+#include <sql/simpleCreate.h>
+#include <sql/simpleDeleteSome.h>
+#include <sql/simpleInsert.h>
+#include <sql/simpleSelectAll.h>
+#include <sql/simpleUpdate.h>
+#include <sql/simpleUpdateAll.h>
#include <thread>
class TestUpdateDatabase : public MyGrate::Output::Pq::UpdateDatabase {
@@ -76,3 +84,46 @@ BOOST_AUTO_TEST_CASE(e2e, *boost::unit_test::timeout(15))
src->stopEvents();
repl.join();
}
+
+BOOST_AUTO_TEST_CASE(txns, *boost::unit_test::timeout(15))
+{
+ const char * const target_schema {"testout"};
+ using namespace MyGrate::Testing;
+ MySQLDB my;
+ PqConnDB pq {ROOT "/db/schema.sql"};
+
+ auto pqm = pq.mock();
+ auto mym = my.mock();
+
+ MyGrate::sql::simpleCreate::execute(&mym);
+
+ TestUpdateDatabase out {pqm.connstr.c_str(),
+ MyGrate::Output::Pq::UpdateDatabase::createNew(&pqm, MySQLDB::SERVER, MySQLDB::USER, MySQLDB::PASSWORD,
+ MySQLDB::PORT, my.mockname.c_str(), 100, target_schema)
+ .source};
+ BOOST_CHECK_EQUAL(out.source, 1);
+ auto src = out.getSource();
+ BOOST_REQUIRE(src);
+
+ out.addTable(&mym, "test");
+
+ std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)};
+ MyGrate::Tx {&mym}([&]() {
+ for (unsigned int x = 0; x < 10; x += 1) {
+ MyGrate::sql::simpleInsert::execute(&mym, MyGrate::scprintf<"some string %?">(x));
+ }
+ MyGrate::sql::simpleUpdateAll::execute(&mym, "Same");
+ MyGrate::sql::simpleDeleteSome::execute(&mym, 5);
+ });
+ out.waitFor(14);
+
+ src->stopEvents();
+ repl.join();
+
+ auto recs = MyGrate::sql::simpleSelectAll::execute(&pqm);
+ BOOST_REQUIRE_EQUAL(recs->rows(), 6);
+ BOOST_CHECK_EQUAL(recs->at(0, 0).get<int>(), 5);
+ BOOST_CHECK_EQUAL(recs->at(5, 0).get<int>(), 10);
+ BOOST_CHECK_EQUAL(recs->at(1, 1).get<std::string_view>(), "Same");
+ BOOST_CHECK_EQUAL(recs->at(3, 1).get<std::string_view>(), "Same");
+}