From b72ae9cc0a98d60324c758eee22125e49fce223f Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sat, 31 Jul 2021 12:53:52 +0100 Subject: Copy table content in a TX as part of add table --- lib/output/pq/sql/insertTable.sql | 4 +-- lib/output/pq/updateDatabase.cpp | 68 +++++++++++++++++++++++---------------- lib/output/pq/updateDatabase.h | 3 +- 3 files changed, 44 insertions(+), 31 deletions(-) (limited to 'lib/output') diff --git a/lib/output/pq/sql/insertTable.sql b/lib/output/pq/sql/insertTable.sql index 4b559d0..a31b1a1 100644 --- a/lib/output/pq/sql/insertTable.sql +++ b/lib/output/pq/sql/insertTable.sql @@ -1,3 +1,3 @@ -INSERT INTO mygrate.tables(table_name, source_id) -VALUES($1, $2) +INSERT INTO mygrate.tables(table_name, source_id, start_file, start_position) +VALUES($1, $2, $3, $4) RETURNING table_id diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 9db672d..59c5013 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -83,33 +84,44 @@ namespace MyGrate::Output::Pq { void UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName) { - auto cols = input::sql::selectColumns::execute(conn, tableName); - verify(cols->rows() > 0, "Table has no rows"); - auto tableDef {std::make_unique()}; - Tx {this}([&] { - const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source); - std::stringstream ct; - scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); - TypeMapper tm; - for (auto col : *cols) { - output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); - if (col.currentRow()) { - ct << ','; - } - scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); - if (!col[1]) { - ct << " not null"; - } - if (col[3]) { - ct << " primary key"; - tableDef->keys += 1; + // Assumes a readonly or transaction supporting table + conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str()); + Tx {conn}([&] { + auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create(); + conn->query("UNLOCK TABLES"); + // Consistent unlocked view of table during transaction + auto cols = input::sql::selectColumns::execute(conn, tableName); + verify(cols->rows() > 0, "Table has no rows"); + auto tableDef {std::make_unique()}; + Tx {this}([&] { + const auto table_id + = **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position); + std::stringstream ct; + scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); + TypeMapper tm; + for (auto col : *cols) { + output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); + if (col.currentRow()) { + ct << ','; + } + scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); + if (!col[1]) { + ct << " not null"; + } + if (col[3]) { + ct << " primary key"; + tableDef->keys += 1; + } + tableDef->columns.push_back( + std::make_unique(col[0], tableDef->columns.size() + 1, col[3])); } - tableDef->columns.push_back(std::make_unique(col[0], tableDef->columns.size() + 1, col[3])); - } - ct << ")"; - this->query(ct.str().c_str()); + ct << ")"; + this->query(ct.str().c_str()); + this->copyTableContent(conn, tableName, tableDef); + }); + tables.emplace(tableName, std::move(tableDef)); }); - tables.emplace(tableName, std::move(tableDef)); } struct WritePqCopyStream { @@ -206,13 +218,13 @@ namespace MyGrate::Output::Pq { }; void - UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table) + UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table, const TableDefPtr & tableDef) { auto out = beginBulkUpload(schema.c_str(), table); - auto sourceSelect = [this](auto table) { + auto sourceSelect = [&tableDef](auto table) { std::stringstream sf; unsigned int ordinal {0}; - for (const auto & col : tables.at(table)->columns) { + for (const auto & col : tableDef->columns) { scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name); } sf << " FROM " << table; diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 12df97e..a7188e3 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -50,7 +50,6 @@ namespace MyGrate::Output::Pq { const char * db, int sid, const char * sc); void addTable(Input::MySQLConn *, const char * tableName); - void copyTableContent(Input::MySQLConn *, const char * tableName); // Replication events void updateRow(MariaDB_Event_Ptr) override; @@ -74,6 +73,8 @@ namespace MyGrate::Output::Pq { static void copyAll(const Row & r, std::back_insert_iterator> &&); static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator> &&); + void copyTableContent(Input::MySQLConn *, const char * tableName, const TableDefPtr &); + using Tables = std::map>; Tables tables; Tables::const_iterator selected; -- cgit v1.2.3