diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/output/pq/sql/selectColumns.sql | 10 | ||||
-rw-r--r-- | lib/output/pq/sql/selectSource.sql | 2 | ||||
-rw-r--r-- | lib/output/pq/sql/selectTables.sql | 9 | ||||
-rw-r--r-- | lib/output/pq/sql/updateSourcePosition.sql | 2 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 20 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 16 |
6 files changed, 45 insertions, 14 deletions
diff --git a/lib/output/pq/sql/selectColumns.sql b/lib/output/pq/sql/selectColumns.sql index e9cd107..9c1cbab 100644 --- a/lib/output/pq/sql/selectColumns.sql +++ b/lib/output/pq/sql/selectColumns.sql @@ -1,6 +1,6 @@ -SELECT t.table_name, STRING_AGG(c.column_name, '|' ORDER BY c.ordinal_position) +SELECT t.table_name, c.column_name, c.mysql_ordinal, k.column_name IS NOT NULL is_pk FROM mygrate.source s - JOIN information_schema.tables t USING(table_schema) - LEFT OUTER JOIN information_schema.columns c USING(table_schema, table_name) -WHERE s.id = $1 -GROUP BY t.table_name + JOIN mygrate.tables t USING(source_id) + JOIN mygrate.table_columns c USING(table_id) + LEFT OUTER JOIN information_schema.key_column_usage k USING(table_schema, table_name, column_name) +WHERE s.source_id = $1 diff --git a/lib/output/pq/sql/selectSource.sql b/lib/output/pq/sql/selectSource.sql index db76b4a..32b413d 100644 --- a/lib/output/pq/sql/selectSource.sql +++ b/lib/output/pq/sql/selectSource.sql @@ -1,3 +1,3 @@ SELECT host, username, password, port, filename, position, serverid FROM mygrate.source s -WHERE s.id = $1 +WHERE s.source_id = $1 diff --git a/lib/output/pq/sql/selectTables.sql b/lib/output/pq/sql/selectTables.sql index 389dfe2..ac372b6 100644 --- a/lib/output/pq/sql/selectTables.sql +++ b/lib/output/pq/sql/selectTables.sql @@ -1,6 +1,3 @@ -SELECT t.table_name, STRING_AGG(k.column_name, '|' ORDER BY k.ordinal_position) pk_cols -FROM mygrate.source s - JOIN information_schema.tables t USING(table_schema) - LEFT OUTER JOIN information_schema.key_column_usage k USING(table_schema, table_name) -WHERE s.id = $1 -GROUP BY t.table_name +SELECT t.table_name +FROM mygrate.tables t +WHERE t.source_id = $1 diff --git a/lib/output/pq/sql/updateSourcePosition.sql b/lib/output/pq/sql/updateSourcePosition.sql index 3d3d665..74e22fc 100644 --- a/lib/output/pq/sql/updateSourcePosition.sql +++ b/lib/output/pq/sql/updateSourcePosition.sql @@ -1,4 +1,4 @@ UPDATE mygrate.source SET filename = $1, position = $2 -WHERE id = $3 +WHERE source_id = $3 diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 352517e..8b4d5e3 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -6,11 +6,20 @@ #include <helpers.h> #include <input/replStream.h> #include <memory> +#include <output/pq/sql/selectColumns.h> #include <output/pq/sql/selectSource.h> +#include <output/pq/sql/selectTables.h> #include <stdexcept> namespace MyGrate::Output::Pq { - UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s} { } + UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s} + { + auto trecs = output::pq::sql::selectTables::execute(this, source); + auto crecs = output::pq::sql::selectColumns::execute(this, source); + for (auto t {0U}; t < trecs->rows(); t++) { + tables.emplace(trecs->at(t, 0), std::make_unique<TableDef>(*crecs, trecs->at(t, 0))); + } + } EventSourceBasePtr UpdateDatabase::getSource() @@ -19,4 +28,13 @@ namespace MyGrate::Output::Pq { verify<std::runtime_error>(srcrec->rows() == 1, "Wrong number of source config rows"); return (*srcrec)[0].create<Input::ReplicationStream, 7>(); } + + TableDef::TableDef(const RecordSet & crecs, std::string_view name) + { + for (auto c {0U}; c < crecs.rows(); c++) { + if (crecs.at(c, 0) == name) { + columns.emplace_back(crecs[c].create<ColumnDef, 3, 1>()); + } + } + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 4e01d9c..2e48dd6 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -7,6 +7,21 @@ #include <eventSourceBase.h> namespace MyGrate::Output::Pq { + struct ColumnDef { + std::string name; + std::size_t ordinal; + bool is_pk; + }; + using ColumnDefPtr = std::unique_ptr<ColumnDef>; + + class TableDef { + public: + TableDef(const RecordSet &, std::string_view name); + + std::vector<ColumnDefPtr> columns; + }; + using TableDefPtr = std::unique_ptr<TableDef>; + class UpdateDatabase : public PqConn, public EventHandlerBase { public: UpdateDatabase(const char * const str, uint64_t source); @@ -15,6 +30,7 @@ namespace MyGrate::Output::Pq { private: uint64_t source; + std::map<std::string, TableDefPtr, std::less<>> tables; }; } |