summaryrefslogtreecommitdiff
path: root/lib/output/pq/updateDatabase.cpp
blob: 3829da9d18e941893fbef126a9c70e5b21f1bbac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include "updateDatabase.h"
#include "pqConn.h"
#include "typeMapper.h"
#include <compileTimeFormatter.h>
#include <cstdint>
#include <dbRecordSet.h>
#include <eventSourceBase.h>
#include <helpers.h>
#include <input/mysqlConn.h>
#include <input/replStream.h>
#include <input/sql/selectColumns.h>
#include <input/sql/showMasterStatus.h>
#include <memory>
#include <output/pq/sql/insertColumn.h>
#include <output/pq/sql/insertSource.h>
#include <output/pq/sql/insertTable.h>
#include <output/pq/sql/selectColumns.h>
#include <output/pq/sql/selectSource.h>
#include <output/pq/sql/selectSourceSchema.h>
#include <output/pq/sql/selectTables.h>
#include <stdexcept>
#include <streamSupport.h>

namespace MyGrate::Output::Pq {
	ColumnDef::ColumnDef(std::string n, std::size_t o, bool p) : name {std::move(n)}, ordinal(o), is_pk(p) { }

	UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) :
		PqConn {str}, source {s}, schema(**output::pq::sql::selectSourceSchema::execute(this, s))
	{
		auto trecs = output::pq::sql::selectTables::execute(this, source);
		auto crecs = output::pq::sql::selectColumns::execute(this, source);
		for (auto t {0U}; t < trecs->rows(); t++) {
			tables.emplace(trecs->at(t, 0), std::make_unique<TableDef>(*crecs, trecs->at(t, 0)));
		}
	}

	EventSourceBasePtr
	UpdateDatabase::getSource()
	{
		auto srcrec = output::pq::sql::selectSource::execute(this, source);
		verify<ConfigError>(srcrec->rows() == 1, "Wrong number of source config rows");
		return (*srcrec)[0].create<Input::ReplicationStream, 7>();
	}

	TableDef::TableDef(const RecordSet & crecs, std::string_view name)
	{
		for (auto c {0U}; c < crecs.rows(); c++) {
			if (crecs.at(c, 0) == name) {
				columns.emplace_back(crecs[c].create<ColumnDef, 3, 1>());
			}
		}
	}

	UpdateDatabase
	UpdateDatabase::createNew(PqConn * pq, const char * host, const char * username, const char * password,
			unsigned short port, const char * db, int sid, const char * schema)
	{
		Input::MySQLConn my {host, username, password, port};
		auto ms = input::sql::showMasterStatus::execute(&my);
		auto source_id = Tx {pq}([&]() {
			pq->query(scprintf<"CREATE SCHEMA IF NOT EXISTS %?">(schema).c_str());
			return **output::pq::sql::insertSource::execute(
					pq, host, username, password, port, db, ms->at(0, 0), ms->at(0, 1), sid, schema);
		});
		return UpdateDatabase(pq->connstr.c_str(), source_id);
	}

	void
	UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName)
	{
		auto cols = input::sql::selectColumns::execute(conn, tableName);
		verify<std::logic_error>(cols->rows() > 0, "Table has no rows");
		auto tableDef {std::make_unique<TableDef>()};
		Tx {this}([&] {
			const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source);
			std::stringstream ct;
			scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName);
			TypeMapper tm;
			for (auto col : *cols) {
				output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id);
				if (col.currentRow())
					ct << ',';
				scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0])));
				if (!col[1])
					ct << " not null";
				if (col[3])
					ct << " primary key";
				tableDef->columns.push_back(std::make_unique<ColumnDef>(col[0], tableDef->columns.size() + 1, col[3]));
			}
			ct << ")";
			this->query(ct.str().c_str());
		});
		tables.emplace(tableName, std::move(tableDef));
	}
}