summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2021-08-22 13:58:49 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2021-08-22 13:58:49 +0100
commit172567f6ee168846b41d11ed7b8bd9c1ccbb659b (patch)
tree31cb78c4dedc32e9629815b855ecc7bedfcaf0b0
parentAdd EventCounter class (diff)
downloadmygrate-172567f6ee168846b41d11ed7b8bd9c1ccbb659b.tar.bz2
mygrate-172567f6ee168846b41d11ed7b8bd9c1ccbb659b.tar.xz
mygrate-172567f6ee168846b41d11ed7b8bd9c1ccbb659b.zip
Wait on specific events being processed
-rw-r--r--lib/input/replStream.cpp2
-rw-r--r--lib/input/replStream.h5
-rw-r--r--lib/output/pq/updateDatabase.cpp7
-rw-r--r--lib/output/pq/updateDatabase.h4
-rw-r--r--lib/streamSupport.h1
-rw-r--r--test/semaphore.cpp37
-rw-r--r--test/semaphore.h27
-rw-r--r--test/test-e2e.cpp37
8 files changed, 45 insertions, 75 deletions
diff --git a/lib/input/replStream.cpp b/lib/input/replStream.cpp
index d04e16b..3bfe877 100644
--- a/lib/input/replStream.cpp
+++ b/lib/input/replStream.cpp
@@ -39,8 +39,10 @@ namespace MyGrate::Input {
verify<MySQLErr>(!mariadb_rpl_open(rpl.get()), "Failed to mariadb_rpl_open", this);
while (MyGrate::MariaDB_Event_Ptr event {mariadb_rpl_fetch(rpl.get(), nullptr), &mariadb_free_rpl_event}) {
+ received.tick(event->event_type);
auto np = event->next_event_pos;
if (const auto & h = eventHandlers.at(event->event_type); h.func) {
+ handled.tick(event->event_type);
(eh.*h.func)(std::move(event));
}
position = np;
diff --git a/lib/input/replStream.h b/lib/input/replStream.h
index 40ec390..4958459 100644
--- a/lib/input/replStream.h
+++ b/lib/input/replStream.h
@@ -3,6 +3,7 @@
#include "mysqlConn.h"
#include <cstdint>
+#include <eventCounter.h>
#include <eventSourceBase.h>
#include <string>
@@ -19,10 +20,14 @@ namespace MyGrate::Input {
void readEvents(EventHandlerBase &) override;
void stopEvents() override;
+ const EventCounter & getReceivedCounts() const;
+ const EventCounter & getHandledCounts() const;
+
private:
uint64_t serverid;
std::string filename;
uint64_t position;
+ EventCounter received, handled;
};
}
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index 485c914..330a0af 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -183,6 +183,12 @@ namespace MyGrate::Output::Pq {
}
}
+ const EventCounter &
+ UpdateDatabase::getProcessedCounts() const
+ {
+ return processed;
+ }
+
void
UpdateDatabase::tableMap(MariaDB_Event_Ptr e)
{
@@ -207,6 +213,7 @@ namespace MyGrate::Output::Pq {
void
UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e)
{
+ processed.tick(e->event_type);
if (!intx) {
output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source);
commitTx();
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 63a96b9..85369fd 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -4,6 +4,7 @@
#include "pqConn.h"
#include "pqStmt.h"
#include <cstdint>
+#include <eventCounter.h>
#include <eventHandlerBase.h>
#include <eventSourceBase.h>
#include <row.h>
@@ -62,6 +63,8 @@ namespace MyGrate::Output::Pq {
void gtid(MariaDB_Event_Ptr) override;
void xid(MariaDB_Event_Ptr) override;
+ const EventCounter & getProcessedCounts() const;
+
const uint64_t source;
const std::string schema;
const std::string database;
@@ -85,6 +88,7 @@ namespace MyGrate::Output::Pq {
Tables::const_iterator selected;
MariaDB_Event_Ptr table_map;
bool intx {false};
+ EventCounter processed;
};
}
diff --git a/lib/streamSupport.h b/lib/streamSupport.h
index cab62e6..e9af010 100644
--- a/lib/streamSupport.h
+++ b/lib/streamSupport.h
@@ -1,6 +1,7 @@
#ifndef MYGRATE_STREAM_SUPPORT_H
#define MYGRATE_STREAM_SUPPORT_H
+#include "eventCounter.h"
#include "mariadb_repl.h"
#include <array>
#include <cstddef>
diff --git a/test/semaphore.cpp b/test/semaphore.cpp
deleted file mode 100644
index b7e089f..0000000
--- a/test/semaphore.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "semaphore.h"
-
-#ifndef __cpp_lib_semaphore
-# include <thread>
-
-semaphore::semaphore(unsigned int v_) : v {v_} { }
-
-void
-semaphore::release(unsigned int n)
-{
- std::lock_guard lk {m};
- v += n;
-}
-
-void
-semaphore::acquire()
-{
- while (!try_dec()) { }
-}
-
-bool
-semaphore::try_dec()
-{
- m.lock();
- if (v) {
- v--;
- m.unlock();
- return true;
- }
- else {
- m.unlock();
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- return false;
- }
-}
-
-#endif
diff --git a/test/semaphore.h b/test/semaphore.h
deleted file mode 100644
index 6a5963f..0000000
--- a/test/semaphore.h
+++ /dev/null
@@ -1,27 +0,0 @@
-#ifndef MYGRATE_TEST_SEMAPHORE_H
-#define MYGRATE_TEST_SEMAPHORE_H
-
-#if __has_include(<semaphore>)
-# include <semaphore>
-#endif
-
-#ifdef __cpp_lib_semaphore
-using semaphore = std::binary_semaphore;
-#else
-# include <mutex>
-class semaphore {
-public:
- explicit semaphore(unsigned int v_);
-
- void release(unsigned int n = 1);
-
- void acquire();
-
-private:
- bool try_dec();
- unsigned int v;
- std::mutex m;
-};
-#endif
-
-#endif
diff --git a/test/test-e2e.cpp b/test/test-e2e.cpp
index 4dd40d7..45a8d49 100644
--- a/test/test-e2e.cpp
+++ b/test/test-e2e.cpp
@@ -2,10 +2,11 @@
#include <boost/test/data/test_case.hpp>
#include <boost/test/unit_test.hpp>
-#include "semaphore.h"
+#include "helpers.h"
#include "testdb-mysql.h"
#include "testdb-postgresql.h"
#include <compileTimeFormatter.h>
+#include <condition_variable>
#include <dbConn.h>
#include <input/replStream.h>
#include <output/pq/updateDatabase.h>
@@ -26,20 +27,25 @@ public:
void
afterEvent(const MyGrate::MariaDB_Event_Ptr & e) override
{
- UpdateDatabase::afterEvent(std::move(e));
- ops.release();
+ {
+ std::lock_guard<std::mutex> lk(m);
+ UpdateDatabase::afterEvent(std::move(e));
+ }
+ cv.notify_all();
}
void
- waitFor(unsigned int n)
+ waitFor(EventCounterTarget ect)
{
- while (n--) {
- ops.acquire();
- }
+ std::unique_lock<std::mutex> lk(m);
+ cv.wait(lk, [&] {
+ return getProcessedCounts() >= ect;
+ });
}
private:
- semaphore ops {0};
+ std::condition_variable cv;
+ std::mutex m;
};
using namespace MyGrate::Testing;
@@ -80,7 +86,7 @@ public:
}
void
- stopAfter(unsigned int events)
+ stopAfter(const EventCounterTarget & events)
{
BOOST_REQUIRE(out);
BOOST_REQUIRE(src);
@@ -126,7 +132,11 @@ BOOST_AUTO_TEST_CASE(e2e, *boost::unit_test::timeout(15))
ins->execute({"hashyhash", "testuser", "groupadm", "10.10.0.1", 2433});
mym.query("flush logs");
- stopAfter(4);
+ stopAfter(EventCounterTarget {}
+ .add(UPDATE_ROWS_EVENT_V1, 1)
+ .add(DELETE_ROWS_EVENT_V1, 1)
+ .add(WRITE_ROWS_EVENT_V1, 1)
+ .add(ROTATE_EVENT, 1));
}
BOOST_AUTO_TEST_CASE(txns, *boost::unit_test::timeout(15))
@@ -145,7 +155,12 @@ BOOST_AUTO_TEST_CASE(txns, *boost::unit_test::timeout(15))
MyGrate::sql::simpleUpdateAll::execute(&mym, "Same");
MyGrate::sql::simpleDeleteSome::execute(&mym, 5);
});
- stopAfter(14);
+ stopAfter(EventCounterTarget {}
+ .add(GTID_EVENT, 1)
+ .add(WRITE_ROWS_EVENT_V1, 10)
+ .add(UPDATE_ROWS_EVENT_V1, 1)
+ .add(DELETE_ROWS_EVENT_V1, 1)
+ .add(XID_EVENT, 1));
auto recs = MyGrate::sql::simpleSelectAll::execute(&pqm);
BOOST_REQUIRE_EQUAL(recs->rows(), 6);