summaryrefslogtreecommitdiff
path: root/lib/output/pq
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2021-07-31 12:53:52 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2021-07-31 12:53:52 +0100
commitb72ae9cc0a98d60324c758eee22125e49fce223f (patch)
treeb4540e363e6464b9ecc9a952f90434e168c755d8 /lib/output/pq
parentSupport the rotate event to move to the next binlog file (diff)
downloadmygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.bz2
mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.tar.xz
mygrate-b72ae9cc0a98d60324c758eee22125e49fce223f.zip
Copy table content in a TX as part of add table
Diffstat (limited to 'lib/output/pq')
-rw-r--r--lib/output/pq/sql/insertTable.sql4
-rw-r--r--lib/output/pq/updateDatabase.cpp68
-rw-r--r--lib/output/pq/updateDatabase.h3
3 files changed, 44 insertions, 31 deletions
diff --git a/lib/output/pq/sql/insertTable.sql b/lib/output/pq/sql/insertTable.sql
index 4b559d0..a31b1a1 100644
--- a/lib/output/pq/sql/insertTable.sql
+++ b/lib/output/pq/sql/insertTable.sql
@@ -1,3 +1,3 @@
-INSERT INTO mygrate.tables(table_name, source_id)
-VALUES($1, $2)
+INSERT INTO mygrate.tables(table_name, source_id, start_file, start_position)
+VALUES($1, $2, $3, $4)
RETURNING table_id
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 9db672d..59c5013 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -12,6 +12,7 @@
#include <input/sql/selectColumns.h>
#include <input/sql/showMasterStatus.h>
#include <memory>
+#include <mysql_types.h>
#include <output/pq/sql/insertColumn.h>
#include <output/pq/sql/insertSource.h>
#include <output/pq/sql/insertTable.h>
@@ -83,33 +84,44 @@ namespace MyGrate::Output::Pq {
void
UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName)
{
- auto cols = input::sql::selectColumns::execute(conn, tableName);
- verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
- auto tableDef {std::make_unique<TableOutput>()};
- Tx {this}([&] {
- const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source);
- std::stringstream ct;
- scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
- TypeMapper tm;
- for (auto col : *cols) {
- output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
- if (col.currentRow()) {
- ct << ',';
- }
- scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
- if (!col[1]) {
- ct << " not null";
- }
- if (col[3]) {
- ct << " primary key";
- tableDef->keys += 1;
+ // Assumes a readonly or transaction supporting table
+ conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str());
+ Tx {conn}([&] {
+ auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create<MySQL::ReplicationPosition, 2>();
+ conn->query("UNLOCK TABLES");
+ // Consistent unlocked view of table during transaction
+ auto cols = input::sql::selectColumns::execute(conn, tableName);
+ verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
+ auto tableDef {std::make_unique<TableOutput>()};
+ Tx {this}([&] {
+ const auto table_id
+ = **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position);
+ std::stringstream ct;
+ scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
+ TypeMapper tm;
+ for (auto col : *cols) {
+ output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
+ if (col.currentRow()) {
+ ct << ',';
+ }
+ scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
+ if (!col[1]) {
+ ct << " not null";
+ }
+ if (col[3]) {
+ ct << " primary key";
+ tableDef->keys += 1;
+ }
+ tableDef->columns.push_back(
+ std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3]));
}
- tableDef->columns.push_back(std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3]));
- }
- ct << ")";
- this->query(ct.str().c_str());
+ ct << ")";
+ this->query(ct.str().c_str());
+ this->copyTableContent(conn, tableName, tableDef);
+ });
+ tables.emplace(tableName, std::move(tableDef));
});
- tables.emplace(tableName, std::move(tableDef));
}
struct WritePqCopyStream {
@@ -206,13 +218,13 @@ namespace MyGrate::Output::Pq {
};
void
- UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table)
+ UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table, const TableDefPtr & tableDef)
{
auto out = beginBulkUpload(schema.c_str(), table);
- auto sourceSelect = [this](auto table) {
+ auto sourceSelect = [&tableDef](auto table) {
std::stringstream sf;
unsigned int ordinal {0};
- for (const auto & col : tables.at(table)->columns) {
+ for (const auto & col : tableDef->columns) {
scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name);
}
sf << " FROM " << table;
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 12df97e..a7188e3 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -50,7 +50,6 @@ namespace MyGrate::Output::Pq {
const char * db, int sid, const char * sc);
void addTable(Input::MySQLConn *, const char * tableName);
- void copyTableContent(Input::MySQLConn *, const char * tableName);
// Replication events
void updateRow(MariaDB_Event_Ptr) override;
@@ -74,6 +73,8 @@ namespace MyGrate::Output::Pq {
static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&);
static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&);
+ void copyTableContent(Input::MySQLConn *, const char * tableName, const TableDefPtr &);
+
using Tables = std::map<std::string, TableDefPtr, std::less<>>;
Tables tables;
Tables::const_iterator selected;