diff options
Diffstat (limited to 'lib/input')
-rw-r--r-- | lib/input/replStream.cpp | 15 | ||||
-rw-r--r-- | lib/input/replStream.h | 10 |
2 files changed, 21 insertions, 4 deletions
diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp index 2ec2a79..c3dea9a 100644 --- a/lib/input/replStream.cpp +++ b/lib/input/replStream.cpp @@ -1,5 +1,6 @@ #include "replStream.h" #include "mariadb_repl.h" +#include "mysqlConn.h" #include <eventHandlerBase.h> #include <eventHandlers.h> #include <helpers.h> @@ -8,6 +9,13 @@ #include <utility> namespace MyGrate::Input { + ReplicationStream::ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t sid, std::string fn, uint64_t pos) : + MySQLConn {host, user, pass, port}, + serverid {sid}, filename {std::move(fn)}, position {pos} + { + } + void ReplicationStream::readEvents(MyGrate::EventHandlerBase & eh) { @@ -17,14 +25,15 @@ namespace MyGrate::Input { query("SET @mariadb_slave_capability = 4"); query("SET @master_binlog_checksum = @@global.binlog_checksum"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, 12); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, "mariadb-bin.000242"); - mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, 4); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, serverid); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, filename.c_str()); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, position); mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FLAGS, MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS); verify<std::runtime_error>(!mariadb_rpl_open(rpl.get()), "Failed to mariadb_rpl_open"); while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) { + position = event->next_event_pos; if (const auto & h = eventHandlers.at(event->event_type); h.func) { (eh.*h.func)(std::move(event)); } diff --git a/lib/input/replStream.h b/lib/input/replStream.h index 4b1a7b6..01c9ef3 100644 --- a/lib/input/replStream.h +++ b/lib/input/replStream.h @@ -2,7 +2,9 @@ #define MYGRATE_INPUT_REPLSTREAM_H #include "mysqlConn.h" +#include <cstdint> #include <eventSourceBase.h> +#include <string> namespace MyGrate { class EventHandlerBase; @@ -11,9 +13,15 @@ namespace MyGrate { namespace MyGrate::Input { class ReplicationStream : public EventSourceBase, MySQLConn { public: - using MySQLConn::MySQLConn; + ReplicationStream(const char * const host, const char * const user, const char * const pass, + unsigned short port, uint64_t serverid, std::string filename, uint64_t position); void readEvents(EventHandlerBase &) override; + + private: + uint64_t serverid; + std::string filename; + uint64_t position; }; } |