From b72ae9cc0a98d60324c758eee22125e49fce223f Mon Sep 17 00:00:00 2001
From: Dan Goodliffe <dan@randomdan.homeip.net>
Date: Sat, 31 Jul 2021 12:53:52 +0100
Subject: Copy table content in a TX as part of add table

---
 lib/mysql_types.h                 |  5 +++
 lib/output/pq/sql/insertTable.sql |  4 +--
 lib/output/pq/updateDatabase.cpp  | 68 +++++++++++++++++++++++----------------
 lib/output/pq/updateDatabase.h    |  3 +-
 4 files changed, 49 insertions(+), 31 deletions(-)

(limited to 'lib')

diff --git a/lib/mysql_types.h b/lib/mysql_types.h
index 4f5355c..d99e5a6 100644
--- a/lib/mysql_types.h
+++ b/lib/mysql_types.h
@@ -80,6 +80,11 @@ namespace MyGrate {
 #undef DEFINE_ITYPE
 #undef DEFINE_USTYPE
 #undef DEFINE_TYPE
+
+		struct ReplicationPosition {
+			std::string filename;
+			uint64_t position;
+		};
 	}
 }
 
diff --git a/lib/output/pq/sql/insertTable.sql b/lib/output/pq/sql/insertTable.sql
index 4b559d0..a31b1a1 100644
--- a/lib/output/pq/sql/insertTable.sql
+++ b/lib/output/pq/sql/insertTable.sql
@@ -1,3 +1,3 @@
-INSERT INTO mygrate.tables(table_name, source_id)
-VALUES($1, $2)
+INSERT INTO mygrate.tables(table_name, source_id, start_file, start_position)
+VALUES($1, $2, $3, $4)
 RETURNING table_id
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 9db672d..59c5013 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -12,6 +12,7 @@
 #include <input/sql/selectColumns.h>
 #include <input/sql/showMasterStatus.h>
 #include <memory>
+#include <mysql_types.h>
 #include <output/pq/sql/insertColumn.h>
 #include <output/pq/sql/insertSource.h>
 #include <output/pq/sql/insertTable.h>
@@ -83,33 +84,44 @@ namespace MyGrate::Output::Pq {
 	void
 	UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName)
 	{
-		auto cols = input::sql::selectColumns::execute(conn, tableName);
-		verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
-		auto tableDef {std::make_unique<TableOutput>()};
-		Tx {this}([&] {
-			const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source);
-			std::stringstream ct;
-			scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
-			TypeMapper tm;
-			for (auto col : *cols) {
-				output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
-				if (col.currentRow()) {
-					ct << ',';
-				}
-				scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
-				if (!col[1]) {
-					ct << " not null";
-				}
-				if (col[3]) {
-					ct << " primary key";
-					tableDef->keys += 1;
+		// Assumes a readonly or transaction supporting table
+		conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+		conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str());
+		Tx {conn}([&] {
+			auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create<MySQL::ReplicationPosition, 2>();
+			conn->query("UNLOCK TABLES");
+			// Consistent unlocked view of table during transaction
+			auto cols = input::sql::selectColumns::execute(conn, tableName);
+			verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
+			auto tableDef {std::make_unique<TableOutput>()};
+			Tx {this}([&] {
+				const auto table_id
+						= **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position);
+				std::stringstream ct;
+				scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
+				TypeMapper tm;
+				for (auto col : *cols) {
+					output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
+					if (col.currentRow()) {
+						ct << ',';
+					}
+					scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
+					if (!col[1]) {
+						ct << " not null";
+					}
+					if (col[3]) {
+						ct << " primary key";
+						tableDef->keys += 1;
+					}
+					tableDef->columns.push_back(
+							std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3]));
 				}
-				tableDef->columns.push_back(std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3]));
-			}
-			ct << ")";
-			this->query(ct.str().c_str());
+				ct << ")";
+				this->query(ct.str().c_str());
+				this->copyTableContent(conn, tableName, tableDef);
+			});
+			tables.emplace(tableName, std::move(tableDef));
 		});
-		tables.emplace(tableName, std::move(tableDef));
 	}
 
 	struct WritePqCopyStream {
@@ -206,13 +218,13 @@ namespace MyGrate::Output::Pq {
 	};
 
 	void
-	UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table)
+	UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table, const TableDefPtr & tableDef)
 	{
 		auto out = beginBulkUpload(schema.c_str(), table);
-		auto sourceSelect = [this](auto table) {
+		auto sourceSelect = [&tableDef](auto table) {
 			std::stringstream sf;
 			unsigned int ordinal {0};
-			for (const auto & col : tables.at(table)->columns) {
+			for (const auto & col : tableDef->columns) {
 				scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name);
 			}
 			sf << " FROM " << table;
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 12df97e..a7188e3 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -50,7 +50,6 @@ namespace MyGrate::Output::Pq {
 				const char * db, int sid, const char * sc);
 
 		void addTable(Input::MySQLConn *, const char * tableName);
-		void copyTableContent(Input::MySQLConn *, const char * tableName);
 
 		// Replication events
 		void updateRow(MariaDB_Event_Ptr) override;
@@ -74,6 +73,8 @@ namespace MyGrate::Output::Pq {
 		static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&);
 		static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&);
 
+		void copyTableContent(Input::MySQLConn *, const char * tableName, const TableDefPtr &);
+
 		using Tables = std::map<std::string, TableDefPtr, std::less<>>;
 		Tables tables;
 		Tables::const_iterator selected;
-- 
cgit v1.2.3