diff options
-rw-r--r-- | Jamroot.jam | 1 | ||||
-rw-r--r-- | iwyu.json | 16 | ||||
-rw-r--r-- | lib/input/replStream.cpp | 15 | ||||
-rw-r--r-- | lib/input/replStream.h | 10 | ||||
-rw-r--r-- | lib/output/pq/sql/selectSource.sql | 2 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.cpp | 19 | ||||
-rw-r--r-- | lib/output/pq/updateDatabase.h | 12 | ||||
-rw-r--r-- | main/Jamfile.jam | 2 | ||||
-rw-r--r-- | main/main.cpp | 40 | ||||
-rw-r--r-- | test/test-mysql.cpp | 1 |
10 files changed, 98 insertions, 20 deletions
diff --git a/Jamroot.jam b/Jamroot.jam index 883aaf0..c85ea56 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -10,6 +10,7 @@ import regex : replace ; pkg-config.import libmariadb ; pkg-config.import libpq ; +lib po : : <name>boost_program_options ; variant coverage : debug ; @@ -33,6 +33,22 @@ }, { "include": [ + "<boost/cstdint.hpp>", + "private", + "<cstdint>", + "public" + ] + }, + { + "include": [ + "@<boost/program_options/.*>", + "private", + "<boost/program_options.hpp>", + "public" + ] + }, + { + "include": [ "<boost/mpl/aux_/preprocessed/gcc/list.hpp>", "private", "<boost/mpl/list.hpp>", diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp index 2ec2a79..c3dea9a 100644 --- a/lib/input/replStream.cpp +++ b/lib/input/replStream.cpp @@ -1,5 +1,6 @@ #include "replStream.h" #include "mariadb_repl.h" +#include "mysqlConn.h" #include <eventHandlerBase.h> #include <eventHandlers.h> #include <helpers.h> @@ -8,6 +9,13 @@ #include <utility> namespace MyGrate::Input { + ReplicationStream::ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t sid, std::string fn, uint64_t pos) : + MySQLConn {host, user, pass, port}, + serverid {sid}, filename {std::move(fn)}, position {pos} + { + } + void ReplicationStream::readEvents(MyGrate::EventHandlerBase & eh) { @@ -17,14 +25,15 @@ namespace MyGrate::Input { query("SET @mariadb_slave_capability = 4"); query("SET @master_binlog_checksum = @@global.binlog_checksum"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, 12); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, "mariadb-bin.000242"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, 4); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, serverid); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, filename.c_str()); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, position); mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FLAGS, MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS); verify<std::runtime_error>(!mariadb_rpl_open(rpl.get()), "Failed to mariadb_rpl_open"); while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) { + position = event->next_event_pos; if (const auto & h = eventHandlers.at(event->event_type); h.func) { (eh.*h.func)(std::move(event)); } diff --git a/lib/input/replStream.h b/lib/input/replStream.h index 4b1a7b6..01c9ef3 100644 --- a/lib/input/replStream.h +++ b/lib/input/replStream.h @@ -2,7 +2,9 @@ #define MYGRATE_INPUT_REPLSTREAM_H #include "mysqlConn.h" +#include <cstdint> #include <eventSourceBase.h> +#include <string> namespace MyGrate { class EventHandlerBase; @@ -11,9 +13,15 @@ namespace MyGrate { namespace MyGrate::Input { class ReplicationStream : public EventSourceBase, MySQLConn { public: - using MySQLConn::MySQLConn; + ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t serverid, std::string filename, uint64_t position); void readEvents(EventHandlerBase &) override; + + private: + uint64_t serverid; + std::string filename; + uint64_t position; }; } diff --git a/lib/output/pq/sql/selectSource.sql b/lib/output/pq/sql/selectSource.sql index 3048410..db76b4a 100644 --- a/lib/output/pq/sql/selectSource.sql +++ b/lib/output/pq/sql/selectSource.sql @@ -1,3 +1,3 @@ -SELECT host, username, password, port, filename, position, serverid, table_schema +SELECT host, username, password, port, filename, position, serverid FROM mygrate.source s WHERE s.id = $1 diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 77a9ce5..ebcabb3 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -1,8 +1,21 @@ #include "updateDatabase.h" #include "pqConn.h" -#include <string> -#include <utility> +#include <cstdint> +#include <dbRecordSet.h> +#include <eventSourceBase.h> +#include <helpers.h> +#include <memory> +#include <output/pq/sql/selectSource.h> +#include <stdexcept> namespace MyGrate::Output::Pq { - UpdateDatabase::UpdateDatabase(const char * const str, std::string p) : PqConn {str}, prefix {std::move(p)} { } + UpdateDatabase::UpdateDatabase(const char * const str, uint64_t s) : PqConn {str}, source {s} { } + + EventSourceBasePtr + UpdateDatabase::getSource() + { + auto srcrec = output::pq::sql::selectSource::execute(this, source); + verify<std::runtime_error>(srcrec->rows() == 1, "Wrong number of source config rows"); + return {}; + } } diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index bc54282..4e01d9c 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -2,15 +2,19 @@ #define MYGRATE_OUTPUT_PQ_UPDATEDATABASE_H #include "pqConn.h" -#include <string> +#include <cstdint> +#include <eventHandlerBase.h> +#include <eventSourceBase.h> namespace MyGrate::Output::Pq { - class UpdateDatabase : PqConn { + class UpdateDatabase : public PqConn, public EventHandlerBase { public: - UpdateDatabase(const char * const str, std::string prefix); + UpdateDatabase(const char * const str, uint64_t source); + + EventSourceBasePtr getSource(); private: - std::string prefix; + uint64_t source; }; } diff --git a/main/Jamfile.jam b/main/Jamfile.jam index d03d005..7adaddb 100644 --- a/main/Jamfile.jam +++ b/main/Jamfile.jam @@ -1,2 +1,2 @@ -exe mygrate : main.cpp : <library>../lib//mygrate ; +exe mygrate : main.cpp : <library>../lib//mygrate <library>..//po ; #run main.cpp : : : <library>../lib//mygrate ; diff --git a/main/main.cpp b/main/main.cpp index bdcda75..815ca67 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -1,10 +1,38 @@ -#include <input/replStream.h> -#include <output/dumpToConsole.h> +#include <boost/lexical_cast/bad_lexical_cast.hpp> +#include <boost/program_options.hpp> +#include <cstdint> +#include <cstdlib> +#include <eventSourceBase.h> +#include <iostream> +#include <memory> +#include <output/pq/updateDatabase.h> +#include <string> + +namespace po = boost::program_options; int -main(int, char **) +main(int argc, char ** argv) { - MyGrate::Input::ReplicationStream rs {"192.168.1.38", "repl", "r3pl", 3306}; - MyGrate::Output::DumpToConsole dtc; - rs.readEvents(dtc); + std::string pgconn; + uint64_t sourceid {}; + bool help {}; + po::options_description opts("MyGrate"); + opts.add_options()("postgresql,p", po::value(&pgconn)->required(), "Target PostgreSQL connection string")( + "sourceid,s", po::value(&sourceid)->default_value(1), "Source identifier")("help,h", + po::value(&help)->zero_tokens(), "Help"); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(opts).run(), vm); + + if (vm.count("help")) { + std::cout << opts; + return EXIT_SUCCESS; + } + po::notify(vm); + + MyGrate::Output::Pq::UpdateDatabase ud {pgconn.c_str(), sourceid}; + auto src {ud.getSource()}; + src->readEvents(ud); + + return EXIT_SUCCESS; } diff --git a/test/test-mysql.cpp b/test/test-mysql.cpp index 493ec02..de9dd83 100644 --- a/test/test-mysql.cpp +++ b/test/test-mysql.cpp @@ -1,7 +1,6 @@ #define BOOST_TEST_MODULE MySQL #include <boost/test/unit_test.hpp> -#include <cstddef> #include <cstdint> #include <cstdlib> #include <dbConn.h> |