diff options
Diffstat (limited to 'lib/output/pq')
| -rw-r--r-- | lib/output/pq/updateDatabase.cpp | 45 | ||||
| -rw-r--r-- | lib/output/pq/updateDatabase.h | 5 | 
2 files changed, 34 insertions, 16 deletions
diff --git a/lib/output/pq/updateDatabase.cpp b/lib/output/pq/updateDatabase.cpp index 4830fd5..b6809c2 100644 --- a/lib/output/pq/updateDatabase.cpp +++ b/lib/output/pq/updateDatabase.cpp @@ -248,12 +248,33 @@ namespace MyGrate::Output::Pq {  	}  	void +	UpdateDatabase::verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr & out) +	{ +		verify<std::runtime_error>( +				e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); +	} + +	void +	UpdateDatabase::copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> && out) +	{ +		std::copy(r.begin(), r.end(), out); +	} + +	void +	UpdateDatabase::copyKeys( +			const Row & r, const TableDefPtr & td, std::back_insert_iterator<std::vector<DbValue>> && out) +	{ +		std::copy_if(r.begin(), r.end(), out, [c = td->columns.begin()](auto &&) mutable { +			return (c++)->get()->is_pk; +		}); +	} + +	void  	UpdateDatabase::updateRow(MariaDB_Event_Ptr e)  	{  		if (selected != tables.end()) {  			auto & out = selected->second; -			verify<std::runtime_error>( -					e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); +			verifyRow(e, out);  			if (!out->update) {  				std::stringstream ou;  				std::size_t ordinal {0}, kordinal {out->columns.size()}; @@ -276,11 +297,8 @@ namespace MyGrate::Output::Pq {  			std::vector<DbValue> updateValues;  			updateValues.reserve(out->columns.size() + out->keys);  			RowPair rp {e->event.rows, table_map->event.table_map}; -			std::copy(rp.second.begin(), rp.second.end(), std::back_inserter(updateValues)); -			std::copy_if(rp.first.begin(), rp.first.end(), std::back_inserter(updateValues), -					[c = out->columns.begin()](auto &&) mutable { -						return (c++)->get()->is_pk; -					}); +			copyAll(rp.second, std::back_inserter(updateValues)); +			copyKeys(rp.first, out, std::back_inserter(updateValues));  			out->update->execute(updateValues);  			verify<ReplicationError>(out->update->rows() == 1, "Wrong number of rows updated.");  		} @@ -291,8 +309,7 @@ namespace MyGrate::Output::Pq {  	{  		if (selected != tables.end()) {  			auto & out = selected->second; -			verify<std::runtime_error>( -					e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); +			verifyRow(e, out);  			if (!out->deleteFrom) {  				std::stringstream ou;  				std::size_t kordinal {0}; @@ -310,10 +327,7 @@ namespace MyGrate::Output::Pq {  			std::vector<DbValue> updateValues;  			updateValues.reserve(out->keys);  			Row rp {e->event.rows, table_map->event.table_map}; -			std::copy_if(rp.begin(), rp.end(), std::back_inserter(updateValues), -					[c = out->columns.begin()](auto &&) mutable { -						return (c++)->get()->is_pk; -					}); +			copyKeys(rp, out, std::back_inserter(updateValues));  			out->deleteFrom->execute(updateValues);  			verify<ReplicationError>(out->deleteFrom->rows() == 1, "Wrong number of rows deleted.");  		} @@ -324,8 +338,7 @@ namespace MyGrate::Output::Pq {  	{  		if (selected != tables.end()) {  			auto & out = selected->second; -			verify<std::runtime_error>( -					e->event.rows.column_count == out->columns.size(), "Incorrect number of columns in row data"); +			verifyRow(e, out);  			if (!out->insertInto) {  				std::stringstream ou;  				std::size_t ordinal {0}, vordinal {0}; @@ -346,7 +359,7 @@ namespace MyGrate::Output::Pq {  			std::vector<DbValue> updateValues;  			updateValues.reserve(out->columns.size());  			Row rp {e->event.rows, table_map->event.table_map}; -			std::copy(rp.begin(), rp.end(), std::back_inserter(updateValues)); +			copyAll(rp, std::back_inserter(updateValues));  			out->insertInto->execute(updateValues);  			verify<ReplicationError>(out->insertInto->rows() == 1, "Wrong number of rows updated.");  		} diff --git a/lib/output/pq/updateDatabase.h b/lib/output/pq/updateDatabase.h index 89d9916..970c6d5 100644 --- a/lib/output/pq/updateDatabase.h +++ b/lib/output/pq/updateDatabase.h @@ -6,6 +6,7 @@  #include <cstdint>  #include <eventHandlerBase.h>  #include <eventSourceBase.h> +#include <row.h>  namespace MyGrate::Input {  	class MySQLConn; @@ -65,6 +66,10 @@ namespace MyGrate::Output::Pq {  		UpdateDatabase(PqConn &&, uint64_t source);  		UpdateDatabase(PqConn &&, uint64_t source, RecordSetPtr cfg); +		static void verifyRow(const MariaDB_Event_Ptr & e, const TableDefPtr &); +		static void copyAll(const Row & r, std::back_insert_iterator<std::vector<DbValue>> &&); +		static void copyKeys(const Row & r, const TableDefPtr &, std::back_insert_iterator<std::vector<DbValue>> &&); +  		using Tables = std::map<std::string, TableDefPtr, std::less<>>;  		Tables tables;  		Tables::const_iterator selected;  | 
