diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-31 12:53:52 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-31 12:53:52 +0100 |
commit | b72ae9cc0a98d60324c758eee22125e49fce223f (patch) | |
tree | b4540e363e6464b9ecc9a952f90434e168c755d8 | |
parent | Support the rotate event to move to the next binlog file (diff) | |
download | mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.bz2 mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.xz mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.zip |
Copy table content in a TX as part of add table
-rw-r--r-- | db/schema.sql | 4 | ||||
-rw-r--r-- | lib/mysql_types.h | 5 | ||||
-rw-r--r-- | lib/output/pq/sql/insertTable.sql | 4 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 68 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 | ||||
-rw-r--r-- | test/test-e2e.cpp | 1 |
6 files changed, 52 insertions, 33 deletions
diff --git a/db/schema.sql b/db/schema.sql index 11b8259..086dbf8 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -92,7 +92,9 @@ ALTER TABLE mygrate.table_columns ALTER COLUMN column_id ADD GENERATED ALWAYS AS CREATE TABLE mygrate.tables ( table_id integer NOT NULL, table_name name NOT NULL, - source_id integer NOT NULL + source_id integer NOT NULL, + start_file text NOT NULL, + start_position bigint NOT NULL ); diff --git a/lib/mysql_types.h b/lib/mysql_types.h index 4f5355c..d99e5a6 100644 --- a/lib/mysql_types.h +++ b/lib/mysql_types.h @@ -80,6 +80,11 @@ namespace MyGrate { #undef DEFINE_ITYPE #undef DEFINE_USTYPE #undef DEFINE_TYPE + + struct ReplicationPosition { + std::string filename; + uint64_t position; + }; } } diff --git a/lib/output/pq/sql/insertTable.sql b/lib/output/pq/sql/insertTable.sql index 4b559d0..a31b1a1 100644 --- a/lib/output/pq/sql/insertTable.sql +++ b/lib/output/pq/sql/insertTable.sql @@ -1,3 +1,3 @@ -INSERT INTO mygrate.tables(table_name, source_id) -VALUES($1, $2) +INSERT INTO mygrate.tables(table_name, source_id, start_file, start_position) +VALUES($1, $2, $3, $4) RETURNING table_id diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 9db672d..59c5013 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -12,6 +12,7 @@ #include <input/sql/selectColumns.h> #include <input/sql/showMasterStatus.h> #include <memory> +#include <mysql_types.h> #include <output/pq/sql/insertColumn.h> #include <output/pq/sql/insertSource.h> #include <output/pq/sql/insertTable.h> @@ -83,33 +84,44 @@ namespace MyGrate::Output::Pq { void UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName) { - auto cols = input::sql::selectColumns::execute(conn, tableName); - verify<std::logic_error>(cols->rows() > 0, "Table has no rows"); - auto tableDef {std::make_unique<TableOutput>()}; - Tx {this}([&] { - const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source); - std::stringstream ct; - scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); - TypeMapper tm; - for (auto col : *cols) { - output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); - if (col.currentRow()) { - ct << ','; - } - scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); - if (!col[1]) { - ct << " not null"; - } - if (col[3]) { - ct << " primary key"; - tableDef->keys += 1; + // Assumes a readonly or transaction supporting table + conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str()); + Tx {conn}([&] { + auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create<MySQL::ReplicationPosition, 2>(); + conn->query("UNLOCK TABLES"); + // Consistent unlocked view of table during transaction + auto cols = input::sql::selectColumns::execute(conn, tableName); + verify<std::logic_error>(cols->rows() > 0, "Table has no rows"); + auto tableDef {std::make_unique<TableOutput>()}; + Tx {this}([&] { + const auto table_id + = **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position); + std::stringstream ct; + scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); + TypeMapper tm; + for (auto col : *cols) { + output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); + if (col.currentRow()) { + ct << ','; + } + scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); + if (!col[1]) { + ct << " not null"; + } + if (col[3]) { + ct << " primary key"; + tableDef->keys += 1; + } + tableDef->columns.push_back( + std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3])); } - tableDef->columns.push_back(std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3])); - } - ct << ")"; - this->query(ct.str().c_str()); + ct << ")"; + this->query(ct.str().c_str()); + this->copyTableContent(conn, tableName, tableDef); + }); + tables.emplace(tableName, std::move(tableDef)); }); - tables.emplace(tableName, std::move(tableDef)); } struct WritePqCopyStream { @@ -206,13 +218,13 @@ namespace MyGrate::Output::Pq { }; void - UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table) + UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table, const TableDefPtr & tableDef) { auto out = beginBulkUpload(schema.c_str(), table); - auto sourceSelect = [this](auto table) { + auto sourceSelect = [&tableDef](auto table) { std::stringstream sf; unsigned int ordinal {0}; - for (const auto & col : tables.at(table)->columns) { + for (const auto & col : tableDef->columns) { scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name); } sf << " FROM " << table; diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 12df97e..a7188e3 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -50,7 +50,6 @@ namespace MyGrate::Output::Pq { const char * db, int sid, const char * sc); void addTable(Input::MySQLConn *, const char * tableName); - void copyTableContent(Input::MySQLConn *, const char * tableName); // Replication events void updateRow(MariaDB_Event_Ptr) override; @@ -74,6 +73,8 @@ namespace MyGrate::Output::Pq { static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&); static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&); + void copyTableContent(Input::MySQLConn *, const char * tableName, const TableDefPtr &); + using Tables = std::map<std::string, TableDefPtr, std::less<>>; Tables tables; Tables::const_iterator selected; diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp index 6d484af..0f5a051 100644 --- a/test/test-e2e.cpp +++ b/test/test-e2e.cpp @@ -31,7 +31,6 @@ BOOST_AUTO_TEST_CASE(e2e) BOOST_REQUIRE(src); out.addTable(&mym, "session"); - out.copyTableContent(&mym, "session"); BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1); |