summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libdbpp/tablepatch.cpp162
-rw-r--r--libdbpp/tablepatch.h8
-rw-r--r--libdbpp/unittests/testPatch.cpp16
3 files changed, 131 insertions, 55 deletions
diff --git a/libdbpp/tablepatch.cpp b/libdbpp/tablepatch.cpp
index dc42a42..1f22fcf 100644
--- a/libdbpp/tablepatch.cpp
+++ b/libdbpp/tablepatch.cpp
@@ -80,13 +80,47 @@ self(const typename Container::const_iterator & i)
{
return *i;
}
-#define selfCols self<decltype(TablePatch::cols)>
+#define selfCols self<decltype(DB::TablePatch::cols)>
#define isKey(tp) [tp](auto i){ return AdHoc::containerContains(tp->pk, *i); }
#define isNotKey(tp) [tp](auto i){ return !AdHoc::containerContains(tp->pk, *i); }
+static
+void
+patchDeletesSelect(AdHoc::Buffer & toDelSql, DB::TablePatch * tp)
+{
+ toDelSql.append(" WHERE ");
+ append(toDelSql, tp->pk, " AND ", " b.%s IS NULL", selfCols);
+ if (tp->where) {
+ toDelSql.append(" AND ");
+ tp->where->writeSql(toDelSql);
+ }
+ if (tp->order) {
+ toDelSql.append(" ORDER BY ");
+ tp->order->writeSql(toDelSql);
+ }
+}
+
unsigned int
DB::Connection::patchDeletes(TablePatch * tp)
{
+ if (tp->beforeDelete) {
+ AdHoc::Buffer toDelSql;
+ toDelSql.append("SELECT ");
+ append(toDelSql, tp->cols, ", ", "a.%s", selfCols);
+ toDelSql.appendbf(" FROM %s a LEFT OUTER JOIN %s b ON ",
+ tp->dest, tp->src);
+ append(toDelSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols);
+ patchDeletesSelect(toDelSql, tp);
+ auto del = select(toDelSql);
+ unsigned int offset = 0;
+ if (tp->where) {
+ tp->where->bindParams(del.get(), offset);
+ }
+ if (tp->order) {
+ tp->order->bindParams(del.get(), offset);
+ }
+ tp->beforeDelete(del);
+ }
AdHoc::Buffer toDelSql;
switch (bulkDeleteStyle()) {
case BulkDeleteUsingSubSelect:
@@ -113,16 +147,7 @@ DB::Connection::patchDeletes(TablePatch * tp)
toDelSql.appendbf(" FROM %s a LEFT OUTER JOIN %s b ON ",
tp->dest, tp->src);
append(toDelSql, tp->pk, " AND ", " a.%s = b.%s", selfCols, selfCols);
- toDelSql.append(" WHERE ");
- append(toDelSql, tp->pk, " AND ", " b.%s IS NULL", [](auto & pki){ return *pki; });
- if (tp->where) {
- toDelSql.append(" AND ");
- tp->where->writeSql(toDelSql);
- }
- if (tp->order) {
- toDelSql.append(" ORDER BY ");
- tp->order->writeSql(toDelSql);
- }
+ patchDeletesSelect(toDelSql, tp);
toDelSql.append(")");
break;
}
@@ -145,16 +170,7 @@ DB::Connection::patchDeletes(TablePatch * tp)
if (tp->insteadOfDelete) {
tp->insteadOfDelete->writeSql(toDelSql);
}
- toDelSql.append(" WHERE ");
- append(toDelSql, tp->pk, " AND ", " b.% IS NULL", selfCols);
- if (tp->where) {
- toDelSql.append(" AND ");
- tp->where->writeSql(toDelSql);
- }
- if (tp->order) {
- toDelSql.append(" ORDER BY ");
- tp->order->writeSql(toDelSql);
- }
+ patchDeletesSelect(toDelSql, tp);
break;
}
}
@@ -172,6 +188,24 @@ DB::Connection::patchDeletes(TablePatch * tp)
return del->execute();
}
+static
+void
+patchUpdatesSelect(AdHoc::Buffer & updSql, DB::TablePatch * tp)
+{
+ updSql.append(" WHERE ");
+ append(updSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols);
+ updSql.append(" AND (");
+ appendIf(updSql, tp->cols, isNotKey(tp), " OR ",
+ " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
+ + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
+ selfCols, selfCols, selfCols, selfCols);
+ updSql.append(")");
+ if (tp->where) {
+ updSql.append(" AND ");
+ tp->where->writeSql(updSql);
+ }
+}
+
unsigned int
DB::Connection::patchUpdates(TablePatch * tp)
{
@@ -179,6 +213,28 @@ DB::Connection::patchUpdates(TablePatch * tp)
// Can't "change" anything... it's all part of the key
return 0;
}
+ if (tp->beforeUpdate) {
+ AdHoc::Buffer updSql;
+ updSql.append("SELECT ");
+ append(updSql, tp->pk, ", ", "a.%s", selfCols);
+ appendIf(updSql, tp->cols, isNotKey(tp), "", ", a.%1% old_%1%", selfCols);
+ appendIf(updSql, tp->cols, isNotKey(tp), "", ", b.%1% new_%1%", selfCols);
+ updSql.appendbf(" FROM %s a, %s b ", tp->dest, tp->src);
+ patchUpdatesSelect(updSql, tp);
+ if (tp->order) {
+ updSql.append(" ORDER BY ");
+ tp->order->writeSql(updSql);
+ }
+ auto upd = select(updSql);
+ unsigned int offset = 0;
+ if (tp->where) {
+ tp->where->bindParams(upd.get(), offset);
+ }
+ if (tp->order) {
+ tp->order->bindParams(upd.get(), offset);
+ }
+ tp->beforeUpdate(upd);
+ }
switch (bulkUpdateStyle()) {
case BulkUpdateUsingFromSrc:
{
@@ -191,18 +247,7 @@ DB::Connection::patchUpdates(TablePatch * tp)
appendIf(updSql, tp->cols, isNotKey(tp), ", ", " %s = b.%s ", selfCols, selfCols);
updSql.appendbf(" FROM %s b ",
tp->src);
- updSql.append(" WHERE ");
- append(updSql, tp->pk, " AND " , " a.%s = b.%s ", selfCols, selfCols);
- updSql.append(" AND (");
- appendIf(updSql, tp->cols, isNotKey(tp), " OR ",
- " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
- + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
- selfCols, selfCols, selfCols, selfCols);
- updSql.append(")");
- if (tp->where) {
- updSql.append(" AND ");
- tp->where->writeSql(updSql);
- }
+ patchUpdatesSelect(updSql, tp);
// -----------------------------------------------------------------
// Execute the bulk update command ---------------------------------
// -----------------------------------------------------------------
@@ -223,18 +268,7 @@ DB::Connection::patchUpdates(TablePatch * tp)
updSql.appendbf("UPDATE %s a, %s b SET ",
tp->dest, tp->src);
appendIf(updSql, tp->cols, isNotKey(tp), ", ", " a.%s = b.%s ", selfCols, selfCols);
- updSql.append(" WHERE ");
- append(updSql, tp->pk, " AND ", " a.%s = b.%s ", selfCols, selfCols);
- updSql.append(" AND (");
- appendIf(updSql, tp->cols, isNotKey(tp), " OR ",
- " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
- + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
- selfCols, selfCols, selfCols, selfCols);
- updSql.append(")");
- if (tp->where) {
- updSql.append(" AND ");
- tp->where->writeSql(updSql);
- }
+ patchUpdatesSelect(updSql, tp);
if (tp->order) {
updSql.append(" ORDER BY ");
tp->order->writeSql(updSql);
@@ -257,17 +291,11 @@ DB::Connection::patchUpdates(TablePatch * tp)
}
}
-unsigned int
-DB::Connection::patchInserts(TablePatch * tp)
+static
+void
+patchInsertsSelect(AdHoc::Buffer & toInsSql, DB::TablePatch * tp)
{
- // -----------------------------------------------------------------
- // Build SQL for copying new records -------------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer toInsSql;
- toInsSql.appendbf("INSERT INTO %s(",
- tp->dest);
- append(toInsSql, tp->cols, ", ", "%s", selfCols);
- toInsSql.append(") SELECT ");
+ toInsSql.append("SELECT ");
append(toInsSql, tp->cols, ", ", "b.%s", selfCols);
toInsSql.appendbf(" FROM %s b LEFT OUTER JOIN %s a ON ",
tp->src, tp->dest);
@@ -278,6 +306,30 @@ DB::Connection::patchInserts(TablePatch * tp)
toInsSql.appendf(" ORDER BY ");
tp->order->writeSql(toInsSql);
}
+}
+
+unsigned int
+DB::Connection::patchInserts(TablePatch * tp)
+{
+ if (tp->beforeInsert) {
+ AdHoc::Buffer toInsSql;
+ patchInsertsSelect(toInsSql, tp);
+ auto ins = select(toInsSql);
+ if (tp->order) {
+ unsigned int offset = 0;
+ tp->order->bindParams(ins.get(), offset);
+ }
+ tp->beforeInsert(ins);
+ }
+ // -----------------------------------------------------------------
+ // Build SQL for copying new records -------------------------------
+ // -----------------------------------------------------------------
+ AdHoc::Buffer toInsSql;
+ toInsSql.appendbf("INSERT INTO %s(",
+ tp->dest);
+ append(toInsSql, tp->cols, ", ", "%s", selfCols);
+ toInsSql.append(")\n");
+ patchInsertsSelect(toInsSql, tp);
auto ins = ModifyCommandPtr(newModifyCommand(toInsSql));
if (tp->order) {
unsigned int offset = 0;
diff --git a/libdbpp/tablepatch.h b/libdbpp/tablepatch.h
index a8f5e59..c951cb5 100644
--- a/libdbpp/tablepatch.h
+++ b/libdbpp/tablepatch.h
@@ -7,6 +7,7 @@
#include <connection.h>
#include <modifycommand.h>
#include <selectcommand.h>
+#include <boost/function.hpp>
namespace DB {
class SqlWriter;
@@ -19,6 +20,7 @@ namespace DB {
typedef std::set<ColumnName> ColumnNames;
typedef ColumnNames PrimaryKey;
typedef PrimaryKey::const_iterator PKI;
+ typedef boost::function<void(DB::SelectCommandPtr)> AuditFunction;
public:
/// Default constructor
@@ -44,6 +46,12 @@ namespace DB {
bool doUpdates;
/// Enable insertion
bool doInserts;
+ /// Before delete audit
+ AuditFunction beforeDelete;
+ /// Before update audit
+ AuditFunction beforeUpdate;
+ /// Before insert audit
+ AuditFunction beforeInsert;
};
}
diff --git a/libdbpp/unittests/testPatch.cpp b/libdbpp/unittests/testPatch.cpp
index 6cf2d5d..cca8592 100644
--- a/libdbpp/unittests/testPatch.cpp
+++ b/libdbpp/unittests/testPatch.cpp
@@ -8,6 +8,7 @@
#include <tablepatch.h>
#include <sqlWriter.h>
#include <buffer.h>
+#include <selectcommandUtil.impl.h>
class Mock : public PQ::Mock {
public:
@@ -130,6 +131,21 @@ BOOST_AUTO_TEST_CASE( testOrder )
tp.cols = {"a", "b", "c", "d"};
tp.pk = {"a", "b"};
tp.order = &order;
+ tp.beforeDelete = [](DB::SelectCommandPtr i) {
+ i->forEachRow<int64_t, int64_t, std::string, std::string>([](auto a, auto b, auto c, auto d) {
+ fprintf(stderr, "<< %ld %ld %s %s\n", a, b, c.c_str(), d.c_str());
+ });
+ };
+ tp.beforeUpdate = [](DB::SelectCommandPtr i) {
+ i->forEachRow<int64_t, int64_t, std::string, std::string, std::string, std::string>([](auto a, auto b, auto c1, auto d1, auto c2, auto d2) {
+ fprintf(stderr, "== %ld %ld %s->%s %s->%s\n", a, b, c1.c_str(), c2.c_str(), d1.c_str(), d2.c_str());
+ });
+ };
+ tp.beforeInsert = [](DB::SelectCommandPtr i) {
+ i->forEachRow<int64_t, int64_t, std::string, std::string>([](auto a, auto b, auto c, auto d) {
+ fprintf(stderr, ">> %ld %ld %s %s\n", a, b, c.c_str(), d.c_str());
+ });
+ };
db->beginTx();
auto r = db->patchTable(&tp);
db->commitTx();