1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
#define BOOST_TEST_MODULE EndToEnd
#include <boost/test/data/test_case.hpp>
#include <boost/test/unit_test.hpp>
#include "semaphore.h"
#include "testdb-mysql.h"
#include "testdb-postgresql.h"
#include <input/replStream.h>
#include <output/pq/updateDatabase.h>
#include <sql/createTestTable.h>
#include <sql/fillTestTable.h>
#include <sql/selectTestTable.h>
#include <thread>
class TestUpdateDatabase : public MyGrate::Output::Pq::UpdateDatabase {
public:
using MyGrate::Output::Pq::UpdateDatabase::UpdateDatabase;
void
afterEvent(const MyGrate::MariaDB_Event_Ptr & e) override
{
UpdateDatabase::afterEvent(std::move(e));
ops.release();
}
void
waitFor(unsigned int n)
{
while (n--) {
ops.acquire();
}
}
private:
semaphore ops {0};
};
BOOST_AUTO_TEST_CASE(e2e, *boost::unit_test::timeout(15))
{
const char * const target_schema {"testout"};
using namespace MyGrate::Testing;
MySQLDB my;
PqConnDB pq {ROOT "/db/schema.sql"};
auto pqm = pq.mock();
auto mym = my.mock();
MyGrate::sql::createTestTable::execute(&mym);
MyGrate::sql::fillTestTable::execute(&mym);
TestUpdateDatabase out {pqm.connstr.c_str(),
MyGrate::Output::Pq::UpdateDatabase::createNew(&pqm, MySQLDB::SERVER, MySQLDB::USER, MySQLDB::PASSWORD,
MySQLDB::PORT, my.mockname.c_str(), 100, target_schema)
.source};
BOOST_CHECK_EQUAL(out.source, 1);
auto src = out.getSource();
BOOST_REQUIRE(src);
out.addTable(&mym, "session");
BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1);
std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)};
auto upd = mym.prepare("UPDATE session SET session_id = ? WHERE id = ?", 2);
upd->execute({"food", 1});
auto del = mym.prepare("DELETE FROM session WHERE id = ?", 2);
del->execute({1});
auto ins = mym.prepare("INSERT INTO session(session_id, username, user_lvl, ip_addr, port, created, modified) \
VALUES(?, ?, ?, ?, ?, now(), now())",
5);
ins->execute({"hashyhash", "testuser", "groupadm", "10.10.0.1", 2433});
mym.query("flush logs");
out.waitFor(4);
src->stopEvents();
repl.join();
}
|