summaryrefslogtreecommitdiff
path: root/test/test-e2e.cpp
blob: 37fa924f3b948c740df6f0130e79a964bd0fb6d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#define BOOST_TEST_MODULE EndToEnd
#include <boost/test/data/test_case.hpp>
#include <boost/test/unit_test.hpp>

#include "semaphore.h"
#include "testdb-mysql.h"
#include "testdb-postgresql.h"
#include <input/replStream.h>
#include <output/pq/updateDatabase.h>
#include <sql/createTestTable.h>
#include <sql/fillTestTable.h>
#include <sql/selectTestTable.h>
#include <thread>

class TestUpdateDatabase : public MyGrate::Output::Pq::UpdateDatabase {
public:
	using MyGrate::Output::Pq::UpdateDatabase::UpdateDatabase;
	void
	afterEvent(const MyGrate::MariaDB_Event_Ptr & e) override
	{
		UpdateDatabase::afterEvent(std::move(e));
		ops.release();
	}

	void
	waitFor(unsigned int n)
	{
		while (n--) {
			ops.acquire();
		}
	}

private:
	semaphore ops {0};
};

BOOST_AUTO_TEST_CASE(e2e, *boost::unit_test::timeout(15))
{
	const char * const target_schema {"testout"};
	using namespace MyGrate::Testing;
	MySQLDB my;
	PqConnDB pq {ROOT "/db/schema.sql"};

	auto pqm = pq.mock();
	auto mym = my.mock();

	MyGrate::sql::createTestTable::execute(&mym);
	MyGrate::sql::fillTestTable::execute(&mym);

	TestUpdateDatabase out {pqm.connstr.c_str(),
			MyGrate::Output::Pq::UpdateDatabase::createNew(&pqm, MySQLDB::SERVER, MySQLDB::USER, MySQLDB::PASSWORD,
					MySQLDB::PORT, my.mockname.c_str(), 100, target_schema)
					.source};
	BOOST_CHECK_EQUAL(out.source, 1);
	auto src = out.getSource();
	BOOST_REQUIRE(src);

	out.addTable(&mym, "session");

	BOOST_CHECK_EQUAL(MyGrate::sql::selectTestTable::execute(&pqm)->rows(), 1);

	std::thread repl {&MyGrate::EventSourceBase::readEvents, src.get(), std::ref(out)};

	auto upd = mym.prepare("UPDATE session SET session_id = ? WHERE id = ?", 2);
	upd->execute({"food", 1});
	auto del = mym.prepare("DELETE FROM session WHERE id = ?", 2);
	del->execute({1});
	auto ins = mym.prepare("INSERT INTO session(session_id, username, user_lvl, ip_addr, port, created, modified) \
			VALUES(?, ?, ?, ?, ?, now(), now())",
			5);
	ins->execute({"hashyhash", "testuser", "groupadm", "10.10.0.1", 2433});
	mym.query("flush logs");

	out.waitFor(4);

	src->stopEvents();
	repl.join();
}