summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/output/pq/sql/selectColumns.sql10
-rw-r--r--lib/output/pq/sql/selectSource.sql2
-rw-r--r--lib/output/pq/sql/selectTables.sql9
-rw-r--r--lib/output/pq/sql/updateSourcePosition.sql2
-rw-r--r--lib/output/pq/updateDatabase.cpp20
-rw-r--r--lib/output/pq/updateDatabase.h16
6 files changed, 45 insertions, 14 deletions
diff --git a/lib/output/pq/sql/selectColumns.sql b/lib/output/pq/sql/selectColumns.sql
index e9cd107..9c1cbab 100644
--- a/lib/output/pq/sql/selectColumns.sql
+++ b/lib/output/pq/sql/selectColumns.sql
@@ -1,6 +1,6 @@
-SELECT t.table_name, STRING_AGG(c.column_name, '|' ORDER BY c.ordinal_position)
+SELECT t.table_name, c.column_name, c.mysql_ordinal, k.column_name IS NOT NULL is_pk
FROM mygrate.source s
- JOIN information_schema.tables t USING(table_schema)
- LEFT OUTER JOIN information_schema.columns c USING(table_schema, table_name)
-WHERE s.id = $1
-GROUP BY t.table_name
+ JOIN mygrate.tables t USING(source_id)
+ JOIN mygrate.table_columns c USING(table_id)
+ LEFT OUTER JOIN information_schema.key_column_usage k USING(table_schema, table_name, column_name)
+WHERE s.source_id = $1
diff --git a/lib/output/pq/sql/selectSource.sql b/lib/output/pq/sql/selectSource.sql
index db76b4a..32b413d 100644
--- a/lib/output/pq/sql/selectSource.sql
+++ b/lib/output/pq/sql/selectSource.sql
@@ -1,3 +1,3 @@
SELECT host, username, password, port, filename, position, serverid
FROM mygrate.source s
-WHERE s.id = $1
+WHERE s.source_id = $1
diff --git a/lib/output/pq/sql/selectTables.sql b/lib/output/pq/sql/selectTables.sql
index 389dfe2..ac372b6 100644
--- a/lib/output/pq/sql/selectTables.sql
+++ b/lib/output/pq/sql/selectTables.sql
@@ -1,6 +1,3 @@
-SELECT t.table_name, STRING_AGG(k.column_name, '|' ORDER BY k.ordinal_position) pk_cols
-FROM mygrate.source s
- JOIN information_schema.tables t USING(table_schema)
- LEFT OUTER JOIN information_schema.key_column_usage k USING(table_schema, table_name)
-WHERE s.id = $1
-GROUP BY t.table_name
+SELECT t.table_name
+FROM mygrate.tables t
+WHERE t.source_id = $1
diff --git a/lib/output/pq/sql/updateSourcePosition.sql b/lib/output/pq/sql/updateSourcePosition.sql
index 3d3d665..74e22fc 100644
--- a/lib/output/pq/sql/updateSourcePosition.sql
+++ b/lib/output/pq/sql/updateSourcePosition.sql
@@ -1,4 +1,4 @@
UPDATE mygrate.source SET
filename = $1,
position = $2
-WHERE id = $3
+WHERE source_id = $3
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 352517e..8b4d5e3 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -6,11 +6,20 @@
#include <helpers.h>
#include <input/replStream.h>
#include <memory>
+#include <output/pq/sql/selectColumns.h>
#include <output/pq/sql/selectSource.h>
+#include <output/pq/sql/selectTables.h>
#include <stdexcept>
namespace MyGrate::Output::Pq {
- UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s} { }
+ UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s}
+ {
+ auto trecs = output::pq::sql::selectTables::execute(this, source);
+ auto crecs = output::pq::sql::selectColumns::execute(this, source);
+ for (auto t {0U}; t < trecs->rows(); t++) {
+ tables.emplace(trecs->at(t, 0), std::make_unique<TableDef>(*crecs, trecs->at(t, 0)));
+ }
+ }
EventSourceBasePtr
UpdateDatabase::getSource()
@@ -19,4 +28,13 @@ namespace MyGrate::Output::Pq {
verify<std::runtime_error>(srcrec->rows() == 1, "Wrong number of source config rows");
return (*srcrec)[0].create<Input::ReplicationStream, 7>();
}
+
+ TableDef::TableDef(const RecordSet & crecs, std::string_view name)
+ {
+ for (auto c {0U}; c < crecs.rows(); c++) {
+ if (crecs.at(c, 0) == name) {
+ columns.emplace_back(crecs[c].create<ColumnDef, 3, 1>());
+ }
+ }
+ }
}
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 4e01d9c..2e48dd6 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -7,6 +7,21 @@
#include <eventSourceBase.h>
namespace MyGrate::Output::Pq {
+ struct ColumnDef {
+ std::string name;
+ std::size_t ordinal;
+ bool is_pk;
+ };
+ using ColumnDefPtr = std::unique_ptr<ColumnDef>;
+
+ class TableDef {
+ public:
+ TableDef(const RecordSet &, std::string_view name);
+
+ std::vector<ColumnDefPtr> columns;
+ };
+ using TableDefPtr = std::unique_ptr<TableDef>;
+
class UpdateDatabase : public PqConn, public EventHandlerBase {
public:
UpdateDatabase(const char * const str, uint64_t source);
@@ -15,6 +30,7 @@ namespace MyGrate::Output::Pq {
private:
uint64_t source;
+ std::map<std::string, TableDefPtr, std::less<>> tables;
};
}