summaryrefslogtreecommitdiff
path: root/lib/output/pq/updateDatabase.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/output/pq/updateDatabase.cpp')
-rw-r--r--lib/output/pq/updateDatabase.cpp31
1 files changed, 31 insertions, 0 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 52388be..389d47a 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -1,19 +1,25 @@
#include "updateDatabase.h"
#include "pqConn.h"
+#include "typeMapper.h"
#include <compileTimeFormatter.h>
#include <cstdint>
#include <dbRecordSet.h>
#include <eventSourceBase.h>
#include <helpers.h>
+#include <input/mysqlConn.h>
#include <input/replStream.h>
+#include <input/sql/selectColumns.h>
#include <input/sql/showMasterStatus.h>
#include <memory>
+#include <output/pq/sql/insertColumn.h>
#include <output/pq/sql/insertSource.h>
+#include <output/pq/sql/insertTable.h>
#include <output/pq/sql/selectColumns.h>
#include <output/pq/sql/selectSource.h>
#include <output/pq/sql/selectSourceSchema.h>
#include <output/pq/sql/selectTables.h>
#include <stdexcept>
+#include <streamSupport.h>
namespace MyGrate::Output::Pq {
UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) :
@@ -56,4 +62,29 @@ namespace MyGrate::Output::Pq {
});
return UpdateDatabase(pq->connstr.c_str(), source_id);
}
+
+ void
+ UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName)
+ {
+ auto cols = input::sql::selectColumns::execute(conn, tableName);
+ verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
+ Tx {this}([&] {
+ const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source);
+ std::stringstream ct;
+ scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
+ TypeMapper tm;
+ for (auto col : *cols) {
+ output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
+ if (col.currentRow())
+ ct << ',';
+ scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
+ if (!col[1])
+ ct << " not null";
+ if (col[3])
+ ct << " primary key";
+ }
+ ct << ")";
+ this->query(ct.str().c_str());
+ });
+ }
}