diff options
Diffstat (limited to 'lib/input')
-rw-r--r-- | lib/input/mysqlConn.cpp | 19 | ||||
-rw-r--r-- | lib/input/mysqlConn.h | 14 | ||||
-rw-r--r-- | lib/input/replStream.cpp | 32 | ||||
-rw-r--r-- | lib/input/replStream.h | 16 |
4 files changed, 81 insertions, 0 deletions
diff --git a/lib/input/mysqlConn.cpp b/lib/input/mysqlConn.cpp new file mode 100644 index 0000000..38feb35 --- /dev/null +++ b/lib/input/mysqlConn.cpp @@ -0,0 +1,19 @@ +#include "mysqlConn.h" +#include <stdexcept> + +namespace MyGrate::Input { + MySQLConn::MySQLConn(const char * const host, const char * const user, const char * const pass, unsigned short port) + { + mysql_init(this); + if (!mysql_real_connect(this, host, user, pass, "", port, nullptr, 0)) { + mysql_close(this); + throw std::runtime_error("ConnectionError"); + } + mysql_query(this, "SET NAMES utf8"); + } + + MySQLConn::~MySQLConn() + { + mysql_close(this); + } +} diff --git a/lib/input/mysqlConn.h b/lib/input/mysqlConn.h new file mode 100644 index 0000000..fa3712c --- /dev/null +++ b/lib/input/mysqlConn.h @@ -0,0 +1,14 @@ +#ifndef MYGRATE_INPUT_MYSQLCONN_H +#define MYGRATE_INPUT_MYSQLCONN_H + +#include <mysql.h> + +namespace MyGrate::Input { + class MySQLConn : public MYSQL { + public: + MySQLConn(const char * const host, const char * const user, const char * const pass, unsigned short port); + ~MySQLConn(); + }; +} + +#endif diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp new file mode 100644 index 0000000..88c3caf --- /dev/null +++ b/lib/input/replStream.cpp @@ -0,0 +1,32 @@ +#include "replStream.h" +#include "../eventHandlers.h" +#include "../mariadb_repl.h" + +namespace MyGrate::Input { + void + ReplicationStream::readEvents(MyGrate::EventHandlerBase & eh) + { + using MariaDB_Rpl_Ptr = std::unique_ptr<MARIADB_RPL, decltype(&mariadb_rpl_close)>; + auto rpl = MariaDB_Rpl_Ptr {mariadb_rpl_init(this), &mariadb_rpl_close}; + + mysql_query(this, "SET @mariadb_slave_capability = 4"); + mysql_query(this, "SET @master_binlog_checksum = @@global.binlog_checksum"); + + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_SERVER_ID, 12); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FILENAME, "mariadb-bin.000242"); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_START, 4); + mariadb_rpl_optionsv(rpl.get(), MARIADB_RPL_FLAGS, MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS); + + if (mariadb_rpl_open(rpl.get())) { + throw std::runtime_error("Failed to mariadb_rpl_open"); + } + + while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) { + const auto & h = eventHandlers.at(event->event_type); + if (h.func) { + (eh.*h.func)(std::move(event)); + } + } + } + +} diff --git a/lib/input/replStream.h b/lib/input/replStream.h new file mode 100644 index 0000000..983d9d3 --- /dev/null +++ b/lib/input/replStream.h @@ -0,0 +1,16 @@ +#ifndef MYGRATE_INPUT_REPLSTREAM_H +#define MYGRATE_INPUT_REPLSTREAM_H + +#include "../eventSourceBase.h" +#include "mysqlConn.h" + +namespace MyGrate::Input { + class ReplicationStream : public EventSourceBase, MySQLConn { + public: + using MySQLConn::MySQLConn; + + void readEvents(EventHandlerBase &) override; + }; +} + +#endif |