#include "updateDatabase.h" #include "pqConn.h" #include "typeMapper.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace MyGrate::Output::Pq { ColumnDef::ColumnDef(std::string n, std::size_t o, bool p) : name {std::move(n)}, ordinal(o), is_pk(p) { } UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : UpdateDatabase {PqConn {str}, s} { } UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s) : UpdateDatabase {std::forward(conn), s, output::pq::sql::selectSourceSchema::execute(&conn, s)} { } UpdateDatabase::UpdateDatabase(PqConn && conn, uint64_t s, RecordSetPtr cfg) : PqConn {std::move(conn)}, source {s}, schema(cfg->at(0, 0)), database(cfg->at(0, 1)), selected {tables.end()}, table_map {nullptr, nullptr} { auto trecs = output::pq::sql::selectTables::execute(this, source); auto crecs = output::pq::sql::selectColumns::execute(this, source); for (auto t {0U}; t < trecs->rows(); t++) { tables.emplace(trecs->at(t, 0), std::make_unique(*crecs, trecs->at(t, 0))); } } EventSourceBasePtr UpdateDatabase::getSource() { auto srcrec = output::pq::sql::selectSource::execute(this, source); verify(srcrec->rows() == 1, "Wrong number of source config rows"); return (*srcrec)[0].create(); } TableOutput::TableOutput(const RecordSet & crecs, std::string_view name) : keys {0} { for (auto c {0U}; c < crecs.rows(); c++) { if (crecs.at(c, 0) == name) { const auto & cd = columns.emplace_back(crecs[c].create()); if (cd->is_pk) { keys += 1; } } } } UpdateDatabase UpdateDatabase::createNew(PqConn * pq, const char * host, const char * username, const char * password, unsigned short port, const char * db, int sid, const char * schema) { Input::MySQLConn my {host, username, password, port}; auto ms = input::sql::showMasterStatus::execute(&my); auto source_id = Tx {pq}([&]() { pq->query(scprintf<"CREATE SCHEMA IF NOT EXISTS %?">(schema).c_str()); return **output::pq::sql::insertSource::execute( pq, host, username, password, port, db, ms->at(0, 0), ms->at(0, 1), sid, schema); }); return UpdateDatabase(pq->connstr.c_str(), source_id); } void UpdateDatabase::addTable(Input::MySQLConn * conn, const char * tableName) { // Assumes a readonly or transaction supporting table conn->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"); conn->query(scprintf<"LOCK TABLE %? READ">(tableName).c_str()); Tx {conn}([&] { auto pos = *(*input::sql::showMasterStatus::execute(conn))[0].create(); conn->query("UNLOCK TABLES"); // Consistent unlocked view of table during transaction auto cols = input::sql::selectColumns::execute(conn, tableName); verify(cols->rows() > 0, "Table has no rows"); auto tableDef {std::make_unique()}; Tx {this}([&] { const auto table_id = **output::pq::sql::insertTable::execute(this, tableName, source, pos.filename, pos.position); std::stringstream ct; scprintf<"CREATE TABLE %?.%?(">(ct, schema, tableName); TypeMapper tm; for (auto col : *cols) { output::pq::sql::insertColumn::execute(this, col[0], col.currentRow(), table_id); if (col.currentRow()) { ct << ','; } scprintf<"%? %?">(ct, col[0], tm.map(col[2], scprintf<"%?.%?">(tableName, col[0]))); if (!col[1]) { ct << " not null"; } if (col[3]) { ct << " primary key"; tableDef->keys += 1; } tableDef->columns.push_back( std::make_unique(col[0], tableDef->columns.size() + 1, col[3])); } ct << ")"; this->query(ct.str().c_str()); this->copyTableContent(conn, tableName, tableDef); }); tables.emplace(tableName, std::move(tableDef)); }); } struct WritePqCopyStream { explicit WritePqCopyStream(FILE * o) : out {o} { } WritePqCopyStream(const WritePqCopyStream &) = delete; WritePqCopyStream(WritePqCopyStream &&) = delete; WritePqCopyStream & operator=(const WritePqCopyStream &) = delete; WritePqCopyStream & operator=(WritePqCopyStream &&) = delete; ~WritePqCopyStream() { fputc('\n', out); } void nextField() { fputc('\t', out); } void operator()(std::nullptr_t) const { fputs("\\N", out); } #define BASIC_PRINT(T, fmt) \ void operator()(T v) const \ { \ fprintf(out, fmt, v); \ } BASIC_PRINT(double, "%f") BASIC_PRINT(float, "%f") BASIC_PRINT(int8_t, "%hhd") BASIC_PRINT(uint8_t, "%hhu") BASIC_PRINT(int16_t, "%hd") BASIC_PRINT(uint16_t, "%hu") BASIC_PRINT(int32_t, "%d") BASIC_PRINT(uint32_t, "%u") BASIC_PRINT(int64_t, "%ld") BASIC_PRINT(uint64_t, "%lu") #undef BASIC_PRINT void operator()(timespec) const { throw std::logic_error("timespec not implemented"); } void operator()(Date v) const { fprintf(out, "%d-%d-%d", v.year, v.month, v.day); } void operator()(Time v) const { fprintf(out, "%d:%d:%d", v.hour, v.minute, v.second); } void operator()(DateTime v) const { operator()(static_cast(v)); fputc('T', out); operator()(static_cast