diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-08-05 23:30:09 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-08-05 23:30:09 +0100 |
commit | fdc6de263a58480519d683e67bb3dfa262d9b230 (patch) | |
tree | 1b2a5dbe54a02bacd008e38f2ef965ce40c9c8da | |
parent | Add base handlers for gtid/xid events (diff) | |
download | mygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.tar.bz2 mygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.tar.xz mygrate-fdc6de263a58480519d683e67bb3dfa262d9b230.zip |
Handle gtid and xid transaction events
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 30 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 | ||||
-rw-r--r-- | test/sql/simpleCreate.sql | 5 | ||||
-rw-r--r-- | test/sql/simpleDelete.sql | 2 | ||||
-rw-r--r-- | test/sql/simpleDeleteSome.sql | 2 | ||||
-rw-r--r-- | test/sql/simpleInsert.sql | 1 | ||||
-rw-r--r-- | test/sql/simpleSelectAll.sql | 3 | ||||
-rw-r--r-- | test/sql/simpleUpdate.sql | 1 | ||||
-rw-r--r-- | test/sql/simpleUpdateAll.sql | 1 | ||||
-rw-r--r-- | test/test-e2e.cpp | 51 |
10 files changed, 96 insertions, 3 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 232e8f9..ea8797b 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -27,6 +27,8 @@ #include <streamSupport.h> namespace MyGrate::Output::Pq { + constexpr uint8_t STANDALONE {1}; + #ifndef __cpp_aggregate_paren_init ColumnDef::ColumnDef(std::string n, std::size_t o, bool p) : name {std::move(n)}, ordinal(o), is_pk(p) { } #endif @@ -266,14 +268,18 @@ namespace MyGrate::Output::Pq { void UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &) { - beginTx(); + if (!intx) { + beginTx(); + } } void UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e) { - output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source); - commitTx(); + if (!intx) { + output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source); + commitTx(); + } } void @@ -401,4 +407,22 @@ namespace MyGrate::Output::Pq { afterEvent(e); } } + + void + UpdateDatabase::gtid(MariaDB_Event_Ptr e) + { + beforeEvent(e); + if (!(e->event.gtid.flags & STANDALONE)) { + intx = true; + } + afterEvent(e); + } + + void + UpdateDatabase::xid(MariaDB_Event_Ptr e) + { + beforeEvent(e); + intx = false; + afterEvent(e); + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index d2b4872..716d7f6 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -59,6 +59,8 @@ namespace MyGrate::Output::Pq { void insertRow(MariaDB_Event_Ptr) override; void tableMap(MariaDB_Event_Ptr) override; void rotate(MariaDB_Event_Ptr) override; + void gtid(MariaDB_Event_Ptr) override; + void xid(MariaDB_Event_Ptr) override; const uint64_t source; const std::string schema; @@ -81,6 +83,7 @@ namespace MyGrate::Output::Pq { Tables tables; Tables::const_iterator selected; MariaDB_Event_Ptr table_map; + bool intx {false}; }; } diff --git a/test/sql/simpleCreate.sql b/test/sql/simpleCreate.sql new file mode 100644 index 0000000..c2a89e9 --- /dev/null +++ b/test/sql/simpleCreate.sql @@ -0,0 +1,5 @@ +CREATE TABLE test( + id int auto_increment, + val varchar(20), + primary key(id) +) diff --git a/test/sql/simpleDelete.sql b/test/sql/simpleDelete.sql new file mode 100644 index 0000000..1c1bd85 --- /dev/null +++ b/test/sql/simpleDelete.sql @@ -0,0 +1,2 @@ +DELETE FROM test WHERE id = ? + diff --git a/test/sql/simpleDeleteSome.sql b/test/sql/simpleDeleteSome.sql new file mode 100644 index 0000000..ed69fa8 --- /dev/null +++ b/test/sql/simpleDeleteSome.sql @@ -0,0 +1,2 @@ +DELETE FROM test WHERE id < ? + diff --git a/test/sql/simpleInsert.sql b/test/sql/simpleInsert.sql new file mode 100644 index 0000000..f695332 --- /dev/null +++ b/test/sql/simpleInsert.sql @@ -0,0 +1 @@ +INSERT INTO test(val) VALUES(?) diff --git a/test/sql/simpleSelectAll.sql b/test/sql/simpleSelectAll.sql new file mode 100644 index 0000000..9fe07bc --- /dev/null +++ b/test/sql/simpleSelectAll.sql @@ -0,0 +1,3 @@ +SELECT id, val +FROM testout.test +ORDER BY id diff --git a/test/sql/simpleUpdate.sql b/test/sql/simpleUpdate.sql new file mode 100644 index 0000000..700f127 --- /dev/null +++ b/test/sql/simpleUpdate.sql @@ -0,0 +1 @@ +UPDATE test SET val = ? WHERE id = ? diff --git a/test/sql/simpleUpdateAll.sql b/test/sql/simpleUpdateAll.sql new file mode 100644 index 0000000..1ac2f13 --- /dev/null +++ b/test/sql/simpleUpdateAll.sql @@ -0,0 +1 @@ +UPDATE test SET val = ? diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp index 37fa924..31f3b95 100644 --- a/test/test-e2e.cpp +++ b/test/test-e2e.cpp @@ -5,11 +5,19 @@ #include "semaphore.h" #include "testdb-mysql.h" #include "testdb-postgresql.h" +#include <compileTimeFormatter.h> +#include <dbConn.h> #include <input/replStream.h> #include <output/pq/updateDatabase.h> #include <sql/createTestTable.h> #include <sql/fillTestTable.h> #include <sql/selectTestTable.h> +#include <sql/simpleCreate.h> +#include <sql/simpleDeleteSome.h> +#include <sql/simpleInsert.h> +#include <sql/simpleSelectAll.h> +#include <sql/simpleUpdate.h> +#include <sql/simpleUpdateAll.h> #include <thread> class TestUpdateDatabase : public MyGrate::Output::Pq::UpdateDatabase { @@ -76,3 +84,46 @@ BOOST_AUTO_TEST_CASE(e2e, *boost::unit_test::timeout(15)) src->stopEvents(); repl.join(); } + +BOOST_AUTO_TEST_CASE(txns, *boost::unit_test::timeout(15)) +{ + const char * const target_schema {"testout"}; + using namespace MyGrate::Testing; + MySQLDB my; + PqConnDB pq {ROOT "/db/schema.sql"}; + + auto pqm = pq.mock(); + auto mym = my.mock(); + + MyGrate::sql::simpleCreate::execute(&mym); + + TestUpdateDatabase out {pqm.connstr.c_str(), + MyGrate::Output::Pq::UpdateDatabase::createNew(&pqm, MySQLDB::SERVER, MySQLDB::USER, MySQLDB::PASSWORD, + MySQLDB::PORT, my.mockname.c_str(), 100, target_schema) + .source}; + BOOST_CHECK_EQUAL(out.source, 1); + auto src = out.getSource(); + BOOST_REQUIRE(src); + + out.addTable(&mym, "test"); + + std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)}; + MyGrate::Tx {&mym}([&]() { + for (unsigned int x = 0; x < 10; x += 1) { + MyGrate::sql::simpleInsert::execute(&mym, MyGrate::scprintf<"some string %?">(x)); + } + MyGrate::sql::simpleUpdateAll::execute(&mym, "Same"); + MyGrate::sql::simpleDeleteSome::execute(&mym, 5); + }); + out.waitFor(14); + + src->stopEvents(); + repl.join(); + + auto recs = MyGrate::sql::simpleSelectAll::execute(&pqm); + BOOST_REQUIRE_EQUAL(recs->rows(), 6); + BOOST_CHECK_EQUAL(recs->at(0, 0).get<int>(), 5); + BOOST_CHECK_EQUAL(recs->at(5, 0).get<int>(), 10); + BOOST_CHECK_EQUAL(recs->at(1, 1).get<std::string_view>(), "Same"); + BOOST_CHECK_EQUAL(recs->at(3, 1).get<std::string_view>(), "Same"); +} |