From 9592817df3dd98814c5ae0845dd1617ac20d774d Mon Sep 17 00:00:00 2001
From: Dan Goodliffe <dan@randomdan.homeip.net>
Date: Sun, 25 Jul 2021 12:45:15 +0100
Subject: Add before/after event to update position in replication stream

---
 lib/output/pq/sql/updateSourcePosition.sql |  5 ++---
 lib/output/pq/updateDatabase.cpp           | 20 ++++++++++++++++++++
 lib/output/pq/updateDatabase.h             |  3 +++
 3 files changed, 25 insertions(+), 3 deletions(-)

(limited to 'lib/output')

diff --git a/lib/output/pq/sql/updateSourcePosition.sql b/lib/output/pq/sql/updateSourcePosition.sql
index 74e22fc..09554f4 100644
--- a/lib/output/pq/sql/updateSourcePosition.sql
+++ b/lib/output/pq/sql/updateSourcePosition.sql
@@ -1,4 +1,3 @@
 UPDATE mygrate.source SET
-	filename = $1,
-	position = $2
-WHERE source_id = $3
+	position = $1
+WHERE source_id = $2
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp
index b6809c2..c43d8e4 100644
--- a/lib/output/pq/updateDatabase.cpp
+++ b/lib/output/pq/updateDatabase.cpp
@@ -19,6 +19,7 @@
 #include <output/pq/sql/selectSource.h>
 #include <output/pq/sql/selectSourceSchema.h>
 #include <output/pq/sql/selectTables.h>
+#include <output/pq/sql/updateSourcePosition.h>
 #include <row.h>
 #include <stdexcept>
 #include <streamSupport.h>
@@ -247,6 +248,19 @@ namespace MyGrate::Output::Pq {
 		}
 	}
 
+	void
+	UpdateDatabase::beforeEvent(const MariaDB_Event_Ptr &)
+	{
+		beginTx();
+	}
+
+	void
+	UpdateDatabase::afterEvent(const MariaDB_Event_Ptr & e)
+	{
+		output::pq::sql::updateSourcePosition::execute(this, e->next_event_pos, source);
+		commitTx();
+	}
+
 	void
 	UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out)
 	{
@@ -294,6 +308,7 @@ namespace MyGrate::Output::Pq {
 
 				out->update = prepare(ou.str().c_str(), kordinal);
 			}
+			beforeEvent(e);
 			std::vector<DbValue> updateValues;
 			updateValues.reserve(out->columns.size() + out->keys);
 			RowPair rp {e->event.rows, table_map->event.table_map};
@@ -301,6 +316,7 @@ namespace MyGrate::Output::Pq {
 			copyKeys(rp.first, out, std::back_inserter(updateValues));
 			out->update->execute(updateValues);
 			verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");
+			afterEvent(e);
 		}
 	}
 
@@ -324,12 +340,14 @@ namespace MyGrate::Output::Pq {
 
 				out->deleteFrom = prepare(ou.str().c_str(), kordinal);
 			}
+			beforeEvent(e);
 			std::vector<DbValue> updateValues;
 			updateValues.reserve(out->keys);
 			Row rp {e->event.rows, table_map->event.table_map};
 			copyKeys(rp, out, std::back_inserter(updateValues));
 			out->deleteFrom->execute(updateValues);
 			verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted.");
+			afterEvent(e);
 		}
 	}
 
@@ -356,12 +374,14 @@ namespace MyGrate::Output::Pq {
 
 				out->insertInto = prepare(ou.str().c_str(), out->columns.size());
 			}
+			beforeEvent(e);
 			std::vector<DbValue> updateValues;
 			updateValues.reserve(out->columns.size());
 			Row rp {e->event.rows, table_map->event.table_map};
 			copyAll(rp, std::back_inserter(updateValues));
 			out->insertInto->execute(updateValues);
 			verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated.");
+			afterEvent(e);
 		}
 	}
 }
diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h
index 970c6d5..9547b90 100644
--- a/lib/output/pq/updateDatabase.h
+++ b/lib/output/pq/updateDatabase.h
@@ -66,6 +66,9 @@ namespace MyGrate::Output::Pq {
 		UpdateDatabase(PqConn &&, uint64_t source);
 		UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg);
 
+		void beforeEvent(const MariaDB_Event_Ptr & e);
+		void afterEvent(const MariaDB_Event_Ptr & e);
+
 		static void verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr &);
 		static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&);
 		static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&);
-- 
cgit v1.2.3