diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-04 19:32:42 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2021-07-04 19:32:42 +0100 |
commit | 7804cdf8ecbc2bb40231199669444665df597eb6 (patch) | |
tree | 965801ee94a1113494fe9f0d414eb5d60d205cde | |
parent | Support bulk upload to PQ (diff) | |
download | mygrate-7804cdf8ecbc2bb40231199669444665df597eb6.tar.bz2 mygrate-7804cdf8ecbc2bb40231199669444665df597eb6.tar.xz mygrate-7804cdf8ecbc2bb40231199669444665df597eb6.zip |
Support copying exist table data
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 116 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 1 | ||||
-rw-r--r-- | test/sql/fillTestTable.sql | 2 | ||||
-rw-r--r-- | test/test-e2e.cpp | 5 |
4 files changed, 123 insertions, 1 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 3829da9..457f744 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -7,6 +7,7 @@ #include <eventSourceBase.h> #include <helpers.h> #include <input/mysqlConn.h> +#include <input/mysqlRecordSet.h> #include <input/replStream.h> #include <input/sql/selectColumns.h> #include <input/sql/showMasterStatus.h> @@ -92,4 +93,119 @@ namespace MyGrate::Output::Pq { }); tables.emplace(tableName, std::move(tableDef)); } + + struct WritePqCopyStream { + ~WritePqCopyStream() + { + fputc('\n', out); + } + + void + nextField() + { + fputc('\t', out); + } + + void operator()(std::nullptr_t) const + { + fputs("\\N", out); + } +#define BASIC_PRINT(T, fmt) \ + void operator()(T v) const \ + { \ + fprintf(out, fmt, v); \ + } + BASIC_PRINT(double, "%f") + BASIC_PRINT(float, "%f") + BASIC_PRINT(int8_t, "%hhd") + BASIC_PRINT(uint8_t, "%hhu") + BASIC_PRINT(int16_t, "%hd") + BASIC_PRINT(uint16_t, "%hu") + BASIC_PRINT(int32_t, "%d") + BASIC_PRINT(uint32_t, "%u") + BASIC_PRINT(int64_t, "%ld") + BASIC_PRINT(uint64_t, "%lu") +#undef BASIC_PRINT + void operator()(timespec) const + { + throw std::logic_error("timespec not implemented"); + } + void + operator()(Date v) const + { + fprintf(out, "%d-%d-%d", v.year, v.month, v.day); + } + void + operator()(Time v) const + { + fprintf(out, "%d:%d:%d", v.hour, v.minute, v.second); + } + void + operator()(DateTime v) const + { + operator()(static_cast<Date &>(v)); + fputc('T', out); + operator()(static_cast<Time &>(v)); + } + void + operator()(std::string_view v) const + { + auto pos {v.begin()}; + while (pos != v.end()) { + auto esc = std::find_if(pos, v.end(), [](unsigned char c) { + return std::iscntrl(c); + }); + if (esc != pos) { + fwrite(pos, esc - pos, 1, out); + pos = esc; + } + while (pos != v.end()) { + fprintf(out, "\\%03o", *pos); + pos++; + } + } + } + void operator()(BitSet) const + { + throw std::logic_error("bitset not implemented"); + } + void + operator()(Blob v) const + { + fputs("\\\\x", out); + std::for_each(v.begin(), v.end(), [this](auto b) { + fprintf(out, "%02hhx", (uint8_t)b); + }); + } + + FILE * out; + }; + + void + UpdateDatabase::copyTableContent(Input::MySQLConn * conn, const char * table) + { + auto out = beginBulkUpload(schema.c_str(), table); + std::stringstream sf; + unsigned int ordinal {0}; + for (const auto & col : tables.at(table)->columns) { + scprintf<"%? %?">(sf, !ordinal++ ? "SELECT " : ", ", col->name); + } + sf << " FROM " << table; + auto stmt {conn->prepare(sf.str().c_str(), 0)}; + stmt->execute({}); + auto sourceCursor {stmt->cursor()}; + + const auto cols = sourceCursor->columns(); + while (sourceCursor->fetch()) { + WritePqCopyStream cs {out}; + for (auto ordinal {0U}; ordinal < cols; ordinal += 1) { + if (ordinal) { + cs.nextField(); + } + sourceCursor->at(ordinal).visit(cs); + } + } + + fclose(out); + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index e5b06ef..20ea96a 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -42,6 +42,7 @@ namespace MyGrate::Output::Pq { const char * db, int sid, const char * sc); void addTable(Input::MySQLConn *, const char * tableName); + void copyTableContent(Input::MySQLConn *, const char * tableName); const uint64_t source; const std::string schema; diff --git a/test/sql/fillTestTable.sql b/test/sql/fillTestTable.sql new file mode 100644 index 0000000..188107f --- /dev/null +++ b/test/sql/fillTestTable.sql @@ -0,0 +1,2 @@ +INSERT INTO session(session_id, username, user_lvl, ip_addr, port, created, modified) + VALUES('hashyhash', 'testuser', 'groupadm', '10.10.0.1', 2433, now(), now()); diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp index 9906a0b..1921a08 100644 --- a/test/test-e2e.cpp +++ b/test/test-e2e.cpp @@ -6,6 +6,7 @@ #include "testdb-postgresql.h" #include <output/pq/updateDatabase.h> #include <sql/createTestTable.h> +#include <sql/fillTestTable.h> #include <sql/selectTestTable.h> BOOST_AUTO_TEST_CASE(e2e) @@ -25,7 +26,9 @@ BOOST_AUTO_TEST_CASE(e2e) BOOST_REQUIRE(src); MyGrate::sql::createTestTable::execute(&mym); + MyGrate::sql::fillTestTable::execute(&mym); out.addTable(&mym, "session"); + out.copyTableContent(&mym, "session"); - BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 0); + BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1); } |