From 88708a8aa6a33f265ff990102b44a9a51e3bf4bb Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Thu, 3 Jun 2021 00:19:50 +0100 Subject: Get upstream from DB Adds options from command line and a few supporting tweaks --- lib/input/replStream.cpp | 15 ++++++++++++--- lib/input/replStream.h | 10 +++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) (limited to 'lib/input') diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp index 2ec2a79..c3dea9a 100644 --- a/lib/input/replStream.cpp +++ b/lib/input/replStream.cpp @@ -1,5 +1,6 @@ #include "replStream.h" #include "mariadb_repl.h" +#include "mysqlConn.h" #include #include #include @@ -8,6 +9,13 @@ #include namespace MyGrate::Input { + ReplicationStream::ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t sid, std::string fn, uint64_t pos) : + MySQLConn {host, user, pass, port}, + serverid {sid}, filename {std::move(fn)}, position {pos} + { + } + void ReplicationStream::readEvents(MyGrate::EventHandlerBase & eh) { @@ -17,14 +25,15 @@ namespace MyGrate::Input { query("SET @mariadb_slave_capability = 4"); query("SET @master_binlog_checksum = @@global.binlog_checksum"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, 12); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, "mariadb-bin.000242"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, 4); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, serverid); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, filename.c_str()); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, position); mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FLAGS, MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS); verify(!mariadb_rpl_open(rpl.get()), "Failed to mariadb_rpl_open"); while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) { + position = event->next_event_pos; if (const auto & h = eventHandlers.at(event->event_type); h.func) { (eh.*h.func)(std::move(event)); } diff --git a/lib/input/replStream.h b/lib/input/replStream.h index 4b1a7b6..01c9ef3 100644 --- a/lib/input/replStream.h +++ b/lib/input/replStream.h @@ -2,7 +2,9 @@ #define MYGRATE_INPUT_REPLSTREAM_H #include "mysqlConn.h" +#include #include +#include namespace MyGrate { class EventHandlerBase; @@ -11,9 +13,15 @@ namespace MyGrate { namespace MyGrate::Input { class ReplicationStream : public EventSourceBase, MySQLConn { public: - using MySQLConn::MySQLConn; + ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t serverid, std::string filename, uint64_t position); void readEvents(EventHandlerBase &) override; + + private: + uint64_t serverid; + std::string filename; + uint64_t position; }; } -- cgit v1.2.3