diff options
-rw-r--r-- | libdbpp/connection.cpp | 17 | ||||
-rw-r--r-- | libdbpp/connection.h | 63 | ||||
-rw-r--r-- | libdbpp/tablepatch.cpp | 288 | ||||
-rw-r--r-- | libdbpp/tablepatch.h | 22 | ||||
-rw-r--r-- | libdbpp/unittests/Jamfile.jam | 17 | ||||
-rw-r--r-- | libdbpp/unittests/patch.sql | 19 | ||||
-rw-r--r-- | libdbpp/unittests/source.dat | 4 | ||||
-rw-r--r-- | libdbpp/unittests/target.dat | 4 | ||||
-rw-r--r-- | libdbpp/unittests/testPatch.cpp | 178 |
9 files changed, 609 insertions, 3 deletions
diff --git a/libdbpp/connection.cpp b/libdbpp/connection.cpp index d766dc4..24d19f9 100644 --- a/libdbpp/connection.cpp +++ b/libdbpp/connection.cpp @@ -53,6 +53,23 @@ DB::Connection::insertId() const throw std::runtime_error("insertId not implemented for this driver."); } +DB::TablePatch::TablePatch() : + insteadOfDelete(nullptr), + where(nullptr), + order(nullptr) +{ +} + +void +DB::SqlWriter::bindParams(DB::Command *, unsigned int &) +{ +} + +DB::TransactionRequired::TransactionRequired() : + std::logic_error("A transaction must be opened before performing this operation") +{ +} + INSTANTIATEFACTORY(DB::Connection, std::string); PLUGINRESOLVER(DB::ConnectionFactory, DB::Connection::resolvePlugin); diff --git a/libdbpp/connection.h b/libdbpp/connection.h index 8a8aa5c..a15a9ab 100644 --- a/libdbpp/connection.h +++ b/libdbpp/connection.h @@ -2,24 +2,74 @@ #define DB_CONNECTION_H #include <string> +#include <set> #include <factory.h> +#include <exception.h> #include <visibility.h> #include <boost/filesystem/path.hpp> #include <boost/shared_ptr.hpp> +namespace AdHoc { + class Buffer; +} + namespace DB { + class Command; class SelectCommand; class ModifyCommand; + enum BulkDeleteStyle { BulkDeleteUsingSubSelect, BulkDeleteUsingUsing, BulkDeleteUsingUsingAlias, }; + enum BulkUpdateStyle { - BulkUpdateByIteration, - BulkUpdateUsingFromSrc, - BulkUpdateUsingJoin, + BulkUpdateUsingFromSrc = 1, + BulkUpdateUsingJoin = 2, + }; + + typedef std::string TableName; + typedef std::string ColumnName; + typedef std::set<ColumnName> ColumnNames; + typedef ColumnNames PrimaryKey; + typedef PrimaryKey::const_iterator PKI; + + struct PatchResult { + unsigned int deletes; + unsigned int updates; + unsigned int inserts; + }; + + class DLL_PUBLIC SqlWriter { + public: + virtual void writeSql(AdHoc::Buffer &) = 0; + virtual void bindParams(Command *, unsigned int &); + }; + + class DLL_PUBLIC TablePatch { + public: + TablePatch(); + + TableName src; + TableName dest; + PrimaryKey pk; + ColumnNames cols; + SqlWriter * insteadOfDelete; + SqlWriter * where; + SqlWriter * order; + }; + + class DLL_PUBLIC PatchCheckFailure : public AdHoc::StdException { + public: + std::string message() const throw() override; }; + + class DLL_PUBLIC TransactionRequired : public std::logic_error { + public: + TransactionRequired(); + }; + /// Base class for connections to a database. class DLL_PUBLIC Connection { public: @@ -71,8 +121,15 @@ namespace DB { /// Return the Id used in the last insert virtual int64_t insertId() const; + /// Patch one table's contents into another. + PatchResult patchTable(TablePatch * tp); + /// AdHoc plugin resolver helper for database connectors. static boost::optional<std::string> resolvePlugin(const std::type_info &, const std::string &); + protected: + unsigned int patchDeletes(TablePatch * tp); + unsigned int patchUpdates(TablePatch * tp); + unsigned int patchInserts(TablePatch * tp); private: }; diff --git a/libdbpp/tablepatch.cpp b/libdbpp/tablepatch.cpp new file mode 100644 index 0000000..98ad1fa --- /dev/null +++ b/libdbpp/tablepatch.cpp @@ -0,0 +1,288 @@ +#include "connection.h" +#include "modifycommand.h" +#include "selectcommand.h" +#include <buffer.h> +#include <safeMapFind.h> +#include <scopeExit.h> +#include <boost/algorithm/string/join.hpp> + +DB::PatchResult +DB::Connection::patchTable(TablePatch * tp) +{ + if (tp->pk.empty()) { + throw PatchCheckFailure(); + } + if (!inTx()) { + throw TransactionRequired(); + } + auto savepointName = stringbf("TablePatch_%p_%d", tp, getpid()); + savepoint(savepointName); + AdHoc::ScopeExit _(AdHoc::ScopeExit::Event(), + [this, savepointName](){ releaseSavepoint(savepointName); }, + [this, savepointName](){ rollbackToSavepoint(savepointName); } + ); + return { + patchDeletes(tp), + patchUpdates(tp), + patchInserts(tp) + }; +} + +template<typename Container> +static inline void +push(const boost::format &, typename Container::const_iterator &) +{ +} + +template<typename Container, typename Value, typename ... Values> +static inline void +push(boost::format & f, typename Container::const_iterator & i, const Value & v, const Values & ... vs) +{ + f % v(i); + push<Container>(f, i, vs...); +} + +template<typename Separator, typename Container, typename ... Ps> +static inline unsigned int +appendIf(AdHoc::Buffer & buf, const Container & c, const boost::function<bool(const typename Container::const_iterator)> & sel, const Separator & sep, const std::string & fmts, const Ps & ... ps) +{ + auto fmt = AdHoc::Buffer::getFormat(fmts); + unsigned int x = 0; + for (typename Container::const_iterator i = c.begin(); i != c.end(); ++i) { + if (sel(i)) { + if (x > 0) { + buf.appendbf("%s", sep); + } + push<Container>(*fmt, i, ps...); + buf.append(fmt->str()); + x += 1; + } + } + return x; +} + +template<typename Separator, typename Container, typename ... Ps> +static inline unsigned int +append(AdHoc::Buffer & buf, const Container & c, const Separator & sep, const std::string & fmts, const Ps & ... ps) +{ + return appendIf(buf, c, [](auto){ return true; }, sep, fmts, ps...); +} + +template <typename Container> +static inline typename Container::key_type +self(const typename Container::const_iterator & i) +{ + return *i; +} +#define selfCols self<decltype(TablePatch::cols)> +#define isKey(tp) [tp](auto i){ return AdHoc::containerContains(tp->pk, *i); } +#define isNotKey(tp) [tp](auto i){ return !AdHoc::containerContains(tp->pk, *i); } + +unsigned int +DB::Connection::patchDeletes(TablePatch * tp) +{ + AdHoc::Buffer toDelSql; + switch (bulkDeleteStyle()) { + case BulkDeleteUsingSubSelect: + { + // ----------------------------------------------------------------- + // Build SQL to delete keys ---------------------------------------- + // ----------------------------------------------------------------- + if (tp->insteadOfDelete) { + toDelSql.appendbf("UPDATE %s SET ", + tp->dest); + tp->insteadOfDelete->writeSql(toDelSql); + toDelSql.append(" WHERE ("); + } + else { + toDelSql.appendbf("DELETE FROM %s WHERE (", + tp->dest); + } + append(toDelSql, tp->pk, ", ", "%s.%s", [tp](auto){ return tp->dest; }, selfCols); + // ----------------------------------------------------------------- + // Build SQL to select keys to delete ------------------------------ + // ----------------------------------------------------------------- + toDelSql.append(") IN (SELECT "); + append(toDelSql, tp->pk, ", ", "a.%s", selfCols); + toDelSql.appendbf(" FROM %s a LEFT OUTER JOIN %s b ON ", + tp->dest, tp->src); + append(toDelSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols); + toDelSql.append(" WHERE "); + append(toDelSql, tp->pk, " AND ", " b.%s IS NULL", [](auto & pki){ return *pki; }); + if (tp->where) { + toDelSql.append(" AND "); + tp->where->writeSql(toDelSql); + } + if (tp->order) { + toDelSql.append(" ORDER BY "); + tp->order->writeSql(toDelSql); + } + toDelSql.append(")"); + break; + } + case BulkDeleteUsingUsingAlias: + case BulkDeleteUsingUsing: + { + if (tp->insteadOfDelete) { + toDelSql.appendbf("UPDATE %s a ", + tp->dest); + } + else { + toDelSql.appendbf("DELETE FROM %s USING %s a ", + (bulkDeleteStyle() == BulkDeleteUsingUsingAlias ? "a" : tp->dest), + tp->dest); + } + toDelSql.appendbf(" LEFT OUTER JOIN %s b ", + tp->src); + toDelSql.append(" ON "); + append(toDelSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols); + if (tp->insteadOfDelete) { + tp->insteadOfDelete->writeSql(toDelSql); + } + toDelSql.append(" WHERE "); + append(toDelSql, tp->pk, " AND ", " b.% IS NULL", selfCols); + if (tp->where) { + toDelSql.append(" AND "); + tp->where->writeSql(toDelSql); + } + if (tp->order) { + toDelSql.append(" ORDER BY "); + tp->order->writeSql(toDelSql); + } + break; + } + } + auto del = ModifyCommandPtr(newModifyCommand(toDelSql)); + unsigned int offset = 0; + if (tp->insteadOfDelete) { + tp->insteadOfDelete->bindParams(del.get(), offset); + } + if (tp->where) { + tp->where->bindParams(del.get(), offset); + } + if (tp->order) { + tp->order->bindParams(del.get(), offset); + } + return del->execute(); +} + +unsigned int +DB::Connection::patchUpdates(TablePatch * tp) +{ + if (tp->cols.size() == tp->pk.size()) { + // Can't "change" anything... it's all part of the key + return 0; + } + switch (bulkUpdateStyle()) { + case BulkUpdateUsingFromSrc: + { + // ----------------------------------------------------------------- + // Build SQL for list of updates to perform ------------------------ + // ----------------------------------------------------------------- + AdHoc::Buffer updSql; + updSql.appendbf("UPDATE %s a SET ", + tp->dest); + appendIf(updSql, tp->cols, isNotKey(tp), ", ", " %s = b.%s ", selfCols, selfCols); + updSql.appendbf(" FROM %s b ", + tp->src); + updSql.append(" WHERE "); + append(updSql, tp->pk, " AND " , " a.%s = b.%s ", selfCols, selfCols); + updSql.append(" AND ("); + appendIf(updSql, tp->cols, isNotKey(tp), " OR ", + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + selfCols, selfCols, selfCols, selfCols); + updSql.append(")"); + if (tp->where) { + updSql.append(" AND "); + tp->where->writeSql(updSql); + } + // ----------------------------------------------------------------- + // Execute the bulk update command --------------------------------- + // ----------------------------------------------------------------- + auto upd = ModifyCommandPtr(newModifyCommand(updSql)); + unsigned int offset = 0; + if (tp->where) { + tp->where->bindParams(upd.get(), offset); + } + return upd->execute(true); + } + break; + case BulkUpdateUsingJoin: + { + // ----------------------------------------------------------------- + // Build SQL for list of updates to perform ------------------------ + // ----------------------------------------------------------------- + AdHoc::Buffer updSql; + updSql.appendbf("UPDATE %s a, %s b SET ", + tp->dest, tp->src); + appendIf(updSql, tp->cols, isNotKey(tp), ", ", " a.%s = b.%s ", selfCols, selfCols); + updSql.append(" WHERE "); + append(updSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols); + updSql.append(" AND ("); + appendIf(updSql, tp->cols, isNotKey(tp), " OR ", + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + selfCols, selfCols, selfCols, selfCols); + updSql.append(")"); + if (tp->where) { + updSql.append(" AND "); + tp->where->writeSql(updSql); + } + if (tp->order) { + updSql.append(" ORDER BY "); + tp->order->writeSql(updSql); + } + // ----------------------------------------------------------------- + // Execute the bulk update command --------------------------------- + // ----------------------------------------------------------------- + auto upd = ModifyCommandPtr(newModifyCommand(updSql)); + unsigned int offset = 0; + if (tp->where) { + tp->where->bindParams(upd.get(), offset); + } + if (tp->order) { + tp->order->bindParams(upd.get(), offset); + } + return upd->execute(true); + } + default: + return 0; + } +} + +unsigned int +DB::Connection::patchInserts(TablePatch * tp) +{ + // ----------------------------------------------------------------- + // Build SQL for copying new records ------------------------------- + // ----------------------------------------------------------------- + AdHoc::Buffer toInsSql; + toInsSql.appendbf("INSERT INTO %s(", + tp->dest); + append(toInsSql, tp->cols, ", ", "%s", selfCols); + toInsSql.append(") SELECT "); + append(toInsSql, tp->cols, ", ", "b.%s", selfCols); + toInsSql.appendbf(" FROM %s b LEFT OUTER JOIN %s a ON ", + tp->src, tp->dest); + append(toInsSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols); + toInsSql.append(" WHERE "); + append(toInsSql, tp->pk, " AND ", " a.%s IS NULL", selfCols); + if (tp->order) { + toInsSql.appendf(" ORDER BY "); + tp->order->writeSql(toInsSql); + } + auto ins = ModifyCommandPtr(newModifyCommand(toInsSql)); + if (tp->order) { + unsigned int offset = 0; + tp->order->bindParams(ins.get(), offset); + } + return ins->execute(); +} + +std::string +DB::PatchCheckFailure::message() const throw() +{ + return "Santiy checks failed: check table names and keys"; +} + diff --git a/libdbpp/tablepatch.h b/libdbpp/tablepatch.h new file mode 100644 index 0000000..9615b5c --- /dev/null +++ b/libdbpp/tablepatch.h @@ -0,0 +1,22 @@ +#ifndef TABLEPATCH_H +#define TABLEPATCH_H + +#include <string> +#include <set> +#include <map> +#include <connection.h> +#include <modifycommand.h> +#include <selectcommand.h> + +class TablePatch { + public: + + + private: + void doDeletes(DynamicSql::SqlWriterPtr insteadOfDelete, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order); + void doUpdates(DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order); + void doInserts(DynamicSql::SqlWriterPtr order); +}; + +#endif + diff --git a/libdbpp/unittests/Jamfile.jam b/libdbpp/unittests/Jamfile.jam index aa6c195..08bd027 100644 --- a/libdbpp/unittests/Jamfile.jam +++ b/libdbpp/unittests/Jamfile.jam @@ -24,6 +24,23 @@ run ; run + testPatch.cpp + : : + patch.sql + source.dat + target.dat + : + <define>ROOT=\"$(me)\" + <define>BOOST_TEST_DYN_LINK + <library>..//dbppcore + <library>..//adhocutil + <library>../../libpqpp//dbpp-postgresql + <library>boost_utf + : + testPatch + ; + +run testConnectionPool.cpp : : : <define>ROOT=\"$(me)\" diff --git a/libdbpp/unittests/patch.sql b/libdbpp/unittests/patch.sql new file mode 100644 index 0000000..8cedebb --- /dev/null +++ b/libdbpp/unittests/patch.sql @@ -0,0 +1,19 @@ +CREATE TABLE source( + a integer, + b integer, + c text, + d text, + PRIMARY KEY(a, b)); + +CREATE TABLE target( + a integer, + b integer, + c text, + d text, + deleted boolean not null default(false), + PRIMARY KEY(a, b)); + +CREATE UNIQUE INDEX u ON target(a, b) WHERE NOT deleted; +COPY source(a, b, c, d) FROM '$SCRIPTDIR/source.dat'; +COPY target(a, b, c, d) FROM '$SCRIPTDIR/target.dat'; + diff --git a/libdbpp/unittests/source.dat b/libdbpp/unittests/source.dat new file mode 100644 index 0000000..03bbdc8 --- /dev/null +++ b/libdbpp/unittests/source.dat @@ -0,0 +1,4 @@ +1 1 one one +1 2 onev2 twov2 +3 1 three one +1 3 one three diff --git a/libdbpp/unittests/target.dat b/libdbpp/unittests/target.dat new file mode 100644 index 0000000..75621f1 --- /dev/null +++ b/libdbpp/unittests/target.dat @@ -0,0 +1,4 @@ +1 1 one one +1 2 one two +2 2 two two +2 1 two one diff --git a/libdbpp/unittests/testPatch.cpp b/libdbpp/unittests/testPatch.cpp new file mode 100644 index 0000000..3295072 --- /dev/null +++ b/libdbpp/unittests/testPatch.cpp @@ -0,0 +1,178 @@ +#define BOOST_TEST_MODULE DbTablePatch +#include <boost/test/unit_test.hpp> + +#include <connection.h> +#include <definedDirs.h> +#include <mock.h> +#include <command.h> +#include <buffer.h> + +class Mock : public PQ::Mock { + public: + Mock() : + PQ::Mock("user=postgres dbname=postgres", "pqmock", { rootDir / "patch.sql" }) + { + } +}; + +class OrderByA : public DB::SqlWriter { + public: + void writeSql(AdHoc::Buffer & b) + { + b.append("a"); + } +}; + +class WhereAequals1 : public DB::SqlWriter { + public: + void writeSql(AdHoc::Buffer & b) + { + b.append("a.a = ?"); + } + void bindParams(DB::Command * cmd, unsigned int & o) + { + cmd->bindParamI(o++, 1); + } +}; + +class MarkDeleted : public DB::SqlWriter { + public: + void writeSql(AdHoc::Buffer & b) + { + b.append("deleted = ?"); + } + void bindParams(DB::Command * cmd, unsigned int & o) + { + cmd->bindParamB(o++, true); + } +}; + +BOOST_FIXTURE_TEST_SUITE(mock, Mock); + +BOOST_AUTO_TEST_CASE( sanityFail ) +{ + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + BOOST_REQUIRE_THROW(db->patchTable(&tp), DB::PatchCheckFailure); +} + +BOOST_AUTO_TEST_CASE( noTx ) +{ + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + tp.pk = {"a", "b"}; + BOOST_REQUIRE_THROW(db->patchTable(&tp), DB::TransactionRequired); +} + +BOOST_AUTO_TEST_SUITE_END(); + +BOOST_AUTO_TEST_CASE( testBasic ) +{ + Mock mock; + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + tp.pk = {"a", "b"}; + db->beginTx(); + auto r = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(2, r.deletes); + BOOST_REQUIRE_EQUAL(2, r.inserts); + BOOST_REQUIRE_EQUAL(1, r.updates); + db->beginTx(); + auto r2 = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(0, r2.deletes); + BOOST_REQUIRE_EQUAL(0, r2.inserts); + BOOST_REQUIRE_EQUAL(0, r2.updates); +} + +BOOST_AUTO_TEST_CASE( allKeys ) +{ + Mock mock; + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b"}; + tp.pk = {"a", "b"}; + db->beginTx(); + auto r = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(2, r.deletes); + BOOST_REQUIRE_EQUAL(2, r.inserts); + BOOST_REQUIRE_EQUAL(0, r.updates); +} + +BOOST_AUTO_TEST_CASE( testOrder ) +{ + Mock mock; + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + OrderByA order; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + tp.pk = {"a", "b"}; + tp.order = ℴ + db->beginTx(); + auto r = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(2, r.deletes); + BOOST_REQUIRE_EQUAL(2, r.inserts); + BOOST_REQUIRE_EQUAL(1, r.updates); +} + +BOOST_AUTO_TEST_CASE( testWhere ) +{ + Mock mock; + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + WhereAequals1 where; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + tp.pk = {"a", "b"}; + tp.where = &where; + db->beginTx(); + auto r = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(0, r.deletes); + BOOST_REQUIRE_EQUAL(2, r.inserts); + BOOST_REQUIRE_EQUAL(1, r.updates); +} + +BOOST_AUTO_TEST_CASE( testInstead ) +{ + Mock mock; + auto db = DB::ConnectionPtr(DB::MockDatabase::openConnectionTo("pqmock")); + BOOST_REQUIRE(db); + DB::TablePatch tp; + MarkDeleted mark; + tp.src = "source"; + tp.dest = "target"; + tp.cols = {"a", "b", "c", "d"}; + tp.pk = {"a", "b"}; + tp.insteadOfDelete = &mark; + db->beginTx(); + auto r = db->patchTable(&tp); + db->commitTx(); + BOOST_REQUIRE_EQUAL(2, r.deletes); + BOOST_REQUIRE_EQUAL(2, r.inserts); + BOOST_REQUIRE_EQUAL(1, r.updates); +} + |