From 88708a8aa6a33f265ff990102b44a9a51e3bf4bb Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Thu, 3 Jun 2021 00:19:50 +0100 Subject: Get upstream from DB Adds options from command line and a few supporting tweaks --- Jamroot.jam | 1 + iwyu.json | 16 +++++++++++++++ lib/input/replStream.cpp | 15 +++++++++++--- lib/input/replStream.h | 10 +++++++++- lib/output/pq/sql/selectSource.sql | 2 +- lib/output/pq/updateDatabase.cpp | 19 +++++++++++++++--- lib/output/pq/updateDatabase.h | 12 ++++++++---- main/Jamfile.jam | 2 +- main/main.cpp | 40 ++++++++++++++++++++++++++++++++------ test/test-mysql.cpp | 1 - 10 files changed, 98 insertions(+), 20 deletions(-) diff --git a/Jamroot.jam b/Jamroot.jam index 883aaf0..c85ea56 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -10,6 +10,7 @@ import regex : replace ; pkg-config.import libmariadb ; pkg-config.import libpq ; +lib po : : boost_program_options ; variant coverage : debug ; diff --git a/iwyu.json b/iwyu.json index b471e04..351ae85 100644 --- a/iwyu.json +++ b/iwyu.json @@ -31,6 +31,22 @@ "public" ] }, + { + "include": [ + "", + "private", + "", + "public" + ] + }, + { + "include": [ + "@", + "private", + "", + "public" + ] + }, { "include": [ "", diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp index 2ec2a79..c3dea9a 100644 --- a/lib/input/replStream.cpp +++ b/lib/input/replStream.cpp @@ -1,5 +1,6 @@ #include "replStream.h" #include "mariadb_repl.h" +#include "mysqlConn.h" #include #include #include @@ -8,6 +9,13 @@ #include namespace MyGrate::Input { + ReplicationStream::ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t sid, std::string fn, uint64_t pos) : + MySQLConn {host, user, pass, port}, + serverid {sid}, filename {std::move(fn)}, position {pos} + { + } + void ReplicationStream::readEvents(MyGrate::EventHandlerBase & eh) { @@ -17,14 +25,15 @@ namespace MyGrate::Input { query("SET @mariadb_slave_capability = 4"); query("SET @master_binlog_checksum = @@global.binlog_checksum"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, 12); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, "mariadb-bin.000242"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, 4); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, serverid); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, filename.c_str()); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, position); mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FLAGS, MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS); verify(!mariadb_rpl_open(rpl.get()), "Failed to mariadb_rpl_open"); while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) { + position = event->next_event_pos; if (const auto & h = eventHandlers.at(event->event_type); h.func) { (eh.*h.func)(std::move(event)); } diff --git a/lib/input/replStream.h b/lib/input/replStream.h index 4b1a7b6..01c9ef3 100644 --- a/lib/input/replStream.h +++ b/lib/input/replStream.h @@ -2,7 +2,9 @@ #define MYGRATE_INPUT_REPLSTREAM_H #include "mysqlConn.h" +#include #include +#include namespace MyGrate { class EventHandlerBase; @@ -11,9 +13,15 @@ namespace MyGrate { namespace MyGrate::Input { class ReplicationStream : public EventSourceBase, MySQLConn { public: - using MySQLConn::MySQLConn; + ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t serverid, std::string filename, uint64_t position); void readEvents(EventHandlerBase &) override; + + private: + uint64_t serverid; + std::string filename; + uint64_t position; }; } diff --git a/lib/output/pq/sql/selectSource.sql b/lib/output/pq/sql/selectSource.sql index 3048410..db76b4a 100644 --- a/lib/output/pq/sql/selectSource.sql +++ b/lib/output/pq/sql/selectSource.sql @@ -1,3 +1,3 @@ -SELECT host, username, password, port, filename, position, serverid, table_schema +SELECT host, username, password, port, filename, position, serverid FROM mygrate.source s WHERE s.id = $1 diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 77a9ce5..ebcabb3 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -1,8 +1,21 @@ #include "updateDatabase.h" #include "pqConn.h" -#include -#include +#include +#include +#include +#include +#include +#include +#include namespace MyGrate::Output::Pq { - UpdateDatabase::UpdateDatabase(const char * const str, std::string p) : PqConn {str}, prefix {std::move(p)} { } + UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s} { } + + EventSourceBasePtr + UpdateDatabase::getSource() + { + auto srcrec = output::pq::sql::selectSource::execute(this, source); + verify(srcrec->rows() == 1, "Wrong number of source config rows"); + return {}; + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index bc54282..4e01d9c 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -2,15 +2,19 @@ #define MYGRATE_OUTPUT_PQ_UPDATEDATABASE_H #include "pqConn.h" -#include +#include +#include +#include namespace MyGrate::Output::Pq { - class UpdateDatabase : PqConn { + class UpdateDatabase : public PqConn, public EventHandlerBase { public: - UpdateDatabase(const char * const str, std::string prefix); + UpdateDatabase(const char * const str, uint64_t source); + + EventSourceBasePtr getSource(); private: - std::string prefix; + uint64_t source; }; } diff --git a/main/Jamfile.jam b/main/Jamfile.jam index d03d005..7adaddb 100644 --- a/main/Jamfile.jam +++ b/main/Jamfile.jam @@ -1,2 +1,2 @@ -exe mygrate : main.cpp : ../lib//mygrate ; +exe mygrate : main.cpp : ../lib//mygrate ..//po ; #run main.cpp : : : ../lib//mygrate ; diff --git a/main/main.cpp b/main/main.cpp index bdcda75..815ca67 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -1,10 +1,38 @@ -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace po = boost::program_options; int -main(int, char **) +main(int argc, char ** argv) { - MyGrate::Input::ReplicationStream rs {"192.168.1.38", "repl", "r3pl", 3306}; - MyGrate::Output::DumpToConsole dtc; - rs.readEvents(dtc); + std::string pgconn; + uint64_t sourceid {}; + bool help {}; + po::options_description opts("MyGrate"); + opts.add_options()("postgresql,p", po::value(&pgconn)->required(), "Target PostgreSQL connection string")( + "sourceid,s", po::value(&sourceid)->default_value(1), "Source identifier")("help,h", + po::value(&help)->zero_tokens(), "Help"); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(opts).run(), vm); + + if (vm.count("help")) { + std::cout << opts; + return EXIT_SUCCESS; + } + po::notify(vm); + + MyGrate::Output::Pq::UpdateDatabase ud {pgconn.c_str(), sourceid}; + auto src {ud.getSource()}; + src->readEvents(ud); + + return EXIT_SUCCESS; } diff --git a/test/test-mysql.cpp b/test/test-mysql.cpp index 493ec02..de9dd83 100644 --- a/test/test-mysql.cpp +++ b/test/test-mysql.cpp @@ -1,7 +1,6 @@ #define BOOST_TEST_MODULE MySQL #include -#include #include #include #include -- cgit v1.2.3