diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-31 12:53:52 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-31 12:53:52 +0100 |
commit | b72ae9cc0a98d60324c758eee22125e49fce223f (patch) | |
tree | b4540e363e6464b9ecc9a952f90434e168c755d8 /lib/output/pq | |
parent | Support the rotate event to move to the next binlog file (diff) | |
download | mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.bz2 mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.xz mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.zip |
Copy table content in a TX as part of add table
Diffstat (limited to 'lib/output/pq')
-rw-r--r-- | lib/output/pq/sql/insertTable.sql | 4 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 68 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 3 |
3 files changed, 44 insertions, 31 deletions
diff --git a/lib/output/pq/sql/insertTable.sql b/lib/output/pq/sql/insertTable.sql index 4b559d0..a31b1a1 100644 --- a/lib/output/pq/sql/insertTable.sql +++ b/lib/output/pq/sql/insertTable.sql @@ -1,3 +1,3 @@ -INSERT INTO mygrate.tables(table_name, source_id) -VALUES($1, $2) +INSERT INTO mygrate.tables(table_name, source_id, start_file, start_position) +VALUES($1, $2, $3, $4) RETURNING table_id diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 9db672d..59c5013 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -12,6 +12,7 @@ #include <input/sql/selectColumns.h> #include <input/sql/showMasterStatus.h> #include <memory> +#include <mysql_types.h> #include <output/pq/sql/insertColumn.h> #include <output/pq/sql/insertSource.h> #include <output/pq/sql/insertTable.h> @@ -83,33 +84,44 @@ namespace MyGrate::Output::Pq { void UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName) { - auto cols = input::sql::selectColumns::execute(conn, tableName); - verify<std::logic_error>(cols->rows() > 0, "Table has no rows"); - auto tableDef {std::make_unique<TableOutput>()}; - Tx {this}([&] { - const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source); - std::stringstream ct; - scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); - TypeMapper tm; - for (auto col : *cols) { - output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); - if (col.currentRow()) { - ct << ','; - } - scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); - if (!col[1]) { - ct << " not null"; - } - if (col[3]) { - ct << " primary key"; - tableDef->keys += 1; + // Assumes a readonly or transaction supporting table + conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str()); + Tx {conn}([&] { + auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create<MySQL::ReplicationPosition, 2>(); + conn->query("UNLOCK TABLES"); + // Consistent unlocked view of table during transaction + auto cols = input::sql::selectColumns::execute(conn, tableName); + verify<std::logic_error>(cols->rows() > 0, "Table has no rows"); + auto tableDef {std::make_unique<TableOutput>()}; + Tx {this}([&] { + const auto table_id + = **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position); + std::stringstream ct; + scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); + TypeMapper tm; + for (auto col : *cols) { + output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); + if (col.currentRow()) { + ct << ','; + } + scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); + if (!col[1]) { + ct << " not null"; + } + if (col[3]) { + ct << " primary key"; + tableDef->keys += 1; + } + tableDef->columns.push_back( + std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3])); } - tableDef->columns.push_back(std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3])); - } - ct << ")"; - this->query(ct.str().c_str()); + ct << ")"; + this->query(ct.str().c_str()); + this->copyTableContent(conn, tableName, tableDef); + }); + tables.emplace(tableName, std::move(tableDef)); }); - tables.emplace(tableName, std::move(tableDef)); } struct WritePqCopyStream { @@ -206,13 +218,13 @@ namespace MyGrate::Output::Pq { }; void - UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table) + UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table, const TableDefPtr & tableDef) { auto out = beginBulkUpload(schema.c_str(), table); - auto sourceSelect = [this](auto table) { + auto sourceSelect = [&tableDef](auto table) { std::stringstream sf; unsigned int ordinal {0}; - for (const auto & col : tables.at(table)->columns) { + for (const auto & col : tableDef->columns) { scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name); } sf << " FROM " << table; diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 12df97e..a7188e3 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -50,7 +50,6 @@ namespace MyGrate::Output::Pq { const char * db, int sid, const char * sc); void addTable(Input::MySQLConn *, const char * tableName); - void copyTableContent(Input::MySQLConn *, const char * tableName); // Replication events void updateRow(MariaDB_Event_Ptr) override; @@ -74,6 +73,8 @@ namespace MyGrate::Output::Pq { static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&); static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&); + void copyTableContent(Input::MySQLConn *, const char * tableName, const TableDefPtr &); + using Tables = std::map<std::string, TableDefPtr, std::less<>>; Tables tables; Tables::const_iterator selected; |