diff options
-rw-r--r-- | libdbpp/tablepatch.cpp | 162 | ||||
-rw-r--r-- | libdbpp/tablepatch.h | 8 | ||||
-rw-r--r-- | libdbpp/unittests/testPatch.cpp | 16 |
3 files changed, 131 insertions, 55 deletions
diff --git a/libdbpp/tablepatch.cpp b/libdbpp/tablepatch.cpp index dc42a42..1f22fcf 100644 --- a/libdbpp/tablepatch.cpp +++ b/libdbpp/tablepatch.cpp @@ -80,13 +80,47 @@ self(const typename Container::const_iterator & i) { return *i; } -#define selfCols self<decltype(TablePatch::cols)> +#define selfCols self<decltype(DB::TablePatch::cols)> #define isKey(tp) [tp](auto i){ return AdHoc::containerContains(tp->pk, *i); } #define isNotKey(tp) [tp](auto i){ return !AdHoc::containerContains(tp->pk, *i); } +static +void +patchDeletesSelect(AdHoc::Buffer & toDelSql, DB::TablePatch * tp) +{ + toDelSql.append(" WHERE "); + append(toDelSql, tp->pk, " AND ", " b.%s IS NULL", selfCols); + if (tp->where) { + toDelSql.append(" AND "); + tp->where->writeSql(toDelSql); + } + if (tp->order) { + toDelSql.append(" ORDER BY "); + tp->order->writeSql(toDelSql); + } +} + unsigned int DB::Connection::patchDeletes(TablePatch * tp) { + if (tp->beforeDelete) { + AdHoc::Buffer toDelSql; + toDelSql.append("SELECT "); + append(toDelSql, tp->cols, ", ", "a.%s", selfCols); + toDelSql.appendbf(" FROM %s a LEFT OUTER JOIN %s b ON ", + tp->dest, tp->src); + append(toDelSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols); + patchDeletesSelect(toDelSql, tp); + auto del = select(toDelSql); + unsigned int offset = 0; + if (tp->where) { + tp->where->bindParams(del.get(), offset); + } + if (tp->order) { + tp->order->bindParams(del.get(), offset); + } + tp->beforeDelete(del); + } AdHoc::Buffer toDelSql; switch (bulkDeleteStyle()) { case BulkDeleteUsingSubSelect: @@ -113,16 +147,7 @@ DB::Connection::patchDeletes(TablePatch * tp) toDelSql.appendbf(" FROM %s a LEFT OUTER JOIN %s b ON ", tp->dest, tp->src); append(toDelSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols); - toDelSql.append(" WHERE "); - append(toDelSql, tp->pk, " AND ", " b.%s IS NULL", [](auto & pki){ return *pki; }); - if (tp->where) { - toDelSql.append(" AND "); - tp->where->writeSql(toDelSql); - } - if (tp->order) { - toDelSql.append(" ORDER BY "); - tp->order->writeSql(toDelSql); - } + patchDeletesSelect(toDelSql, tp); toDelSql.append(")"); break; } @@ -145,16 +170,7 @@ DB::Connection::patchDeletes(TablePatch * tp) if (tp->insteadOfDelete) { tp->insteadOfDelete->writeSql(toDelSql); } - toDelSql.append(" WHERE "); - append(toDelSql, tp->pk, " AND ", " b.% IS NULL", selfCols); - if (tp->where) { - toDelSql.append(" AND "); - tp->where->writeSql(toDelSql); - } - if (tp->order) { - toDelSql.append(" ORDER BY "); - tp->order->writeSql(toDelSql); - } + patchDeletesSelect(toDelSql, tp); break; } } @@ -172,6 +188,24 @@ DB::Connection::patchDeletes(TablePatch * tp) return del->execute(); } +static +void +patchUpdatesSelect(AdHoc::Buffer & updSql, DB::TablePatch * tp) +{ + updSql.append(" WHERE "); + append(updSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols); + updSql.append(" AND ("); + appendIf(updSql, tp->cols, isNotKey(tp), " OR ", + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + selfCols, selfCols, selfCols, selfCols); + updSql.append(")"); + if (tp->where) { + updSql.append(" AND "); + tp->where->writeSql(updSql); + } +} + unsigned int DB::Connection::patchUpdates(TablePatch * tp) { @@ -179,6 +213,28 @@ DB::Connection::patchUpdates(TablePatch * tp) // Can't "change" anything... it's all part of the key return 0; } + if (tp->beforeUpdate) { + AdHoc::Buffer updSql; + updSql.append("SELECT "); + append(updSql, tp->pk, ", ", "a.%s", selfCols); + appendIf(updSql, tp->cols, isNotKey(tp), "", ", a.%1% old_%1%", selfCols); + appendIf(updSql, tp->cols, isNotKey(tp), "", ", b.%1% new_%1%", selfCols); + updSql.appendbf(" FROM %s a, %s b ", tp->dest, tp->src); + patchUpdatesSelect(updSql, tp); + if (tp->order) { + updSql.append(" ORDER BY "); + tp->order->writeSql(updSql); + } + auto upd = select(updSql); + unsigned int offset = 0; + if (tp->where) { + tp->where->bindParams(upd.get(), offset); + } + if (tp->order) { + tp->order->bindParams(upd.get(), offset); + } + tp->beforeUpdate(upd); + } switch (bulkUpdateStyle()) { case BulkUpdateUsingFromSrc: { @@ -191,18 +247,7 @@ DB::Connection::patchUpdates(TablePatch * tp) appendIf(updSql, tp->cols, isNotKey(tp), ", ", " %s = b.%s ", selfCols, selfCols); updSql.appendbf(" FROM %s b ", tp->src); - updSql.append(" WHERE "); - append(updSql, tp->pk, " AND " , " a.%s = b.%s ", selfCols, selfCols); - updSql.append(" AND ("); - appendIf(updSql, tp->cols, isNotKey(tp), " OR ", - " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ - + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", - selfCols, selfCols, selfCols, selfCols); - updSql.append(")"); - if (tp->where) { - updSql.append(" AND "); - tp->where->writeSql(updSql); - } + patchUpdatesSelect(updSql, tp); // ----------------------------------------------------------------- // Execute the bulk update command --------------------------------- // ----------------------------------------------------------------- @@ -223,18 +268,7 @@ DB::Connection::patchUpdates(TablePatch * tp) updSql.appendbf("UPDATE %s a, %s b SET ", tp->dest, tp->src); appendIf(updSql, tp->cols, isNotKey(tp), ", ", " a.%s = b.%s ", selfCols, selfCols); - updSql.append(" WHERE "); - append(updSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols); - updSql.append(" AND ("); - appendIf(updSql, tp->cols, isNotKey(tp), " OR ", - " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ - + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", - selfCols, selfCols, selfCols, selfCols); - updSql.append(")"); - if (tp->where) { - updSql.append(" AND "); - tp->where->writeSql(updSql); - } + patchUpdatesSelect(updSql, tp); if (tp->order) { updSql.append(" ORDER BY "); tp->order->writeSql(updSql); @@ -257,17 +291,11 @@ DB::Connection::patchUpdates(TablePatch * tp) } } -unsigned int -DB::Connection::patchInserts(TablePatch * tp) +static +void +patchInsertsSelect(AdHoc::Buffer & toInsSql, DB::TablePatch * tp) { - // ----------------------------------------------------------------- - // Build SQL for copying new records ------------------------------- - // ----------------------------------------------------------------- - AdHoc::Buffer toInsSql; - toInsSql.appendbf("INSERT INTO %s(", - tp->dest); - append(toInsSql, tp->cols, ", ", "%s", selfCols); - toInsSql.append(") SELECT "); + toInsSql.append("SELECT "); append(toInsSql, tp->cols, ", ", "b.%s", selfCols); toInsSql.appendbf(" FROM %s b LEFT OUTER JOIN %s a ON ", tp->src, tp->dest); @@ -278,6 +306,30 @@ DB::Connection::patchInserts(TablePatch * tp) toInsSql.appendf(" ORDER BY "); tp->order->writeSql(toInsSql); } +} + +unsigned int +DB::Connection::patchInserts(TablePatch * tp) +{ + if (tp->beforeInsert) { + AdHoc::Buffer toInsSql; + patchInsertsSelect(toInsSql, tp); + auto ins = select(toInsSql); + if (tp->order) { + unsigned int offset = 0; + tp->order->bindParams(ins.get(), offset); + } + tp->beforeInsert(ins); + } + // ----------------------------------------------------------------- + // Build SQL for copying new records ------------------------------- + // ----------------------------------------------------------------- + AdHoc::Buffer toInsSql; + toInsSql.appendbf("INSERT INTO %s(", + tp->dest); + append(toInsSql, tp->cols, ", ", "%s", selfCols); + toInsSql.append(")\n"); + patchInsertsSelect(toInsSql, tp); auto ins = ModifyCommandPtr(newModifyCommand(toInsSql)); if (tp->order) { unsigned int offset = 0; diff --git a/libdbpp/tablepatch.h b/libdbpp/tablepatch.h index a8f5e59..c951cb5 100644 --- a/libdbpp/tablepatch.h +++ b/libdbpp/tablepatch.h @@ -7,6 +7,7 @@ #include <connection.h> #include <modifycommand.h> #include <selectcommand.h> +#include <boost/function.hpp> namespace DB { class SqlWriter; @@ -19,6 +20,7 @@ namespace DB { typedef std::set<ColumnName> ColumnNames; typedef ColumnNames PrimaryKey; typedef PrimaryKey::const_iterator PKI; + typedef boost::function<void(DB::SelectCommandPtr)> AuditFunction; public: /// Default constructor @@ -44,6 +46,12 @@ namespace DB { bool doUpdates; /// Enable insertion bool doInserts; + /// Before delete audit + AuditFunction beforeDelete; + /// Before update audit + AuditFunction beforeUpdate; + /// Before insert audit + AuditFunction beforeInsert; }; } diff --git a/libdbpp/unittests/testPatch.cpp b/libdbpp/unittests/testPatch.cpp index 6cf2d5d..cca8592 100644 --- a/libdbpp/unittests/testPatch.cpp +++ b/libdbpp/unittests/testPatch.cpp @@ -8,6 +8,7 @@ #include <tablepatch.h> #include <sqlWriter.h> #include <buffer.h> +#include <selectcommandUtil.impl.h> class Mock : public PQ::Mock { public: @@ -130,6 +131,21 @@ BOOST_AUTO_TEST_CASE( testOrder ) tp.cols = {"a", "b", "c", "d"}; tp.pk = {"a", "b"}; tp.order = ℴ + tp.beforeDelete = [](DB::SelectCommandPtr i) { + i->forEachRow<int64_t, int64_t, std::string, std::string>([](auto a, auto b, auto c, auto d) { + fprintf(stderr, "<< %ld %ld %s %s\n", a, b, c.c_str(), d.c_str()); + }); + }; + tp.beforeUpdate = [](DB::SelectCommandPtr i) { + i->forEachRow<int64_t, int64_t, std::string, std::string, std::string, std::string>([](auto a, auto b, auto c1, auto d1, auto c2, auto d2) { + fprintf(stderr, "== %ld %ld %s->%s %s->%s\n", a, b, c1.c_str(), c2.c_str(), d1.c_str(), d2.c_str()); + }); + }; + tp.beforeInsert = [](DB::SelectCommandPtr i) { + i->forEachRow<int64_t, int64_t, std::string, std::string>([](auto a, auto b, auto c, auto d) { + fprintf(stderr, ">> %ld %ld %s %s\n", a, b, c.c_str(), d.c_str()); + }); + }; db->beginTx(); auto r = db->patchTable(&tp); db->commitTx(); |