summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project2/sql/sqlMergeTask.cpp39
-rw-r--r--project2/sql/tablepatch.cpp505
-rw-r--r--project2/sql/tablepatch.h50
3 files changed, 26 insertions, 568 deletions
diff --git a/project2/sql/sqlMergeTask.cpp b/project2/sql/sqlMergeTask.cpp
index 6e2e002..6b1bb75 100644
--- a/project2/sql/sqlMergeTask.cpp
+++ b/project2/sql/sqlMergeTask.cpp
@@ -17,6 +17,8 @@
bool SqlMergeTask::defaultUseTempTable = true;
static void attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert);
+#define foreach(_type, _con, _it) for (_type _it = ((_con).begin()); _it != ((_con).end()); _it++)
+
class SqlMergeInsert;
typedef boost::intrusive_ptr<SqlMergeInsert> SqlMergeInsertPtr;
/// Project2 component insert custom constructed records during an SQL Merge task
@@ -161,14 +163,23 @@ SqlMergeTask::execute(ExecContext * ec) const
for (const TargetColumnPtr & c : cols) {
colNames.insert(c->column);
}
- TablePatch tp(*destdb, dtablet, dtable, colNames);
- tp.doDelete = doDelete(NULL);
- tp.doUpdate = doUpdate(NULL);
- tp.doInsert = doInsert(NULL);
+ DB::TablePatch tp;
+ tp.src = dtablet;
+ tp.dest = dtable;
+ tp.cols = colNames;
+ tp.doDeletes = doDelete(ec);
+ tp.doUpdates = doUpdate(ec);
+ tp.doInserts = doInsert(ec);
for (const Keys::value_type & k : keys) {
- tp.addKey(k);
+ tp.pk.insert(k);
}
- tp.patch(ec, insteadOfDelete, updateWhere, patchOrder);
+ DynamicSql::SqlWriterWrapper iod(ec, insteadOfDelete.get());
+ tp.insteadOfDelete = insteadOfDelete ? &iod : nullptr;
+ DynamicSql::SqlWriterWrapper uw(ec, updateWhere.get());
+ tp.where = updateWhere ? &uw : nullptr;
+ DynamicSql::SqlWriterWrapper po(ec, patchOrder.get());
+ tp.order = patchOrder ? &po : nullptr;
+ destdb->patchTable(&tp);
dropTempTable();
}
@@ -176,12 +187,11 @@ void
SqlMergeTask::createTempTable() const
{
if (useView(NULL)) {
- DB::ModifyCommand * cv = destdb->newModifyCommand(stringf(
- "CREATE VIEW %s AS %s",
- dtablet.c_str(),
- sqlCommand->getSqlFor("").c_str()));
- cv->execute();
- delete cv;
+ AdHoc::Buffer b;
+ b.appendf( "CREATE VIEW %s AS ", dtablet.c_str());
+ sqlCommand->setFilter(Glib::ustring());
+ sqlCommand->writeSql(b);
+ destdb->modify(b)->execute();
}
else {
DB::ModifyCommand * ctt = destdb->newModifyCommand(stringf(
@@ -338,7 +348,10 @@ SqlMergeTask::copyToTempTable(ExecContext * ec) const
}
ins.append((*c)->column);
}
- ins.appendf(" FROM (%s) tmp_src", sqlCommand->getSqlFor("").c_str());
+ ins.appendf(" FROM (");
+ sqlCommand->setFilter(Glib::ustring());
+ sqlCommand->writeSql(ins);
+ ins.appendf(") tmp_src");
DB::ModifyCommand * cttt = destdb->newModifyCommand(ins);
unsigned int off = 0;
sqlCommand->bindParams(ec, cttt, off);
diff --git a/project2/sql/tablepatch.cpp b/project2/sql/tablepatch.cpp
deleted file mode 100644
index 2c634d6..0000000
--- a/project2/sql/tablepatch.cpp
+++ /dev/null
@@ -1,505 +0,0 @@
-#include <pch.hpp>
-#include "tablepatch.h"
-#include <stdio.h>
-#include <buffer.h>
-#include <selectcommand.h>
-#include <column.h>
-#include <boost/algorithm/string/join.hpp>
-
-using namespace DB;
-
-TablePatch::TablePatch(const Connection & wdb, const TablePatch::Table & s, const TablePatch::Table & d,
- const TablePatch::Columns & c) :
- doDelete(true),
- doUpdate(true),
- doInsert(true),
- src(s),
- dest(d),
- cols(c),
- db(wdb)
-{
- if (!src.length()) {
- throw PatchCheckFailure();
- }
- if (!dest.length()) {
- throw PatchCheckFailure();
- }
- if (!db.inTx()) {
- throw PatchCheckFailure();
- }
-}
-
-void
-TablePatch::addKey(const TablePatch::Column & c)
-{
- pk.insert(c);
-}
-
-void
-TablePatch::patch(ExecContext * ec, DynamicSql::SqlWriterPtr insteadOfDelete, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order)
-{
- if (pk.empty()) {
- throw PatchCheckFailure();
- }
- if (doDelete) {
- doDeletes(ec, insteadOfDelete, where, order);
- }
- if (doUpdate) {
- doUpdates(ec, where, order);
- }
- if (doInsert) {
- doInserts(ec, order);
- }
-}
-
-void
-TablePatch::doDeletes(ExecContext * ec, DynamicSql::SqlWriterPtr insteadOfDelete, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order)
-{
- AdHoc::Buffer toDelSql;
- switch (db.bulkDeleteStyle()) {
- case BulkDeleteUsingSubSelect:
- {
- // -----------------------------------------------------------------
- // Build SQL to delete keys ----------------------------------------
- // -----------------------------------------------------------------
- if (insteadOfDelete) {
- toDelSql.appendf("UPDATE %s ",
- dest.c_str());
- insteadOfDelete->writeSql(toDelSql);
- toDelSql.append(" WHERE (");
- }
- else {
- toDelSql.appendf("DELETE FROM %s WHERE (",
- dest.c_str());
- }
- foreach (PKI, pk, pki) {
- if (pki != pk.begin()) {
- toDelSql.append(", ");
- }
- toDelSql.appendf("%s.%s",
- dest.c_str(),
- pki->c_str());
- }
- // -----------------------------------------------------------------
- // Build SQL to select keys to delete ------------------------------
- // -----------------------------------------------------------------
- toDelSql.append(") IN (SELECT ");
- foreach (PKI, pk, pki) {
- if (pki != pk.begin()) {
- toDelSql.append(", ");
- }
- toDelSql.appendf("a.%s",
- pki->c_str());
- }
- toDelSql.appendf(" FROM %s a LEFT OUTER JOIN %s b ON ",
- dest.c_str(), src.c_str());
- foreach (PKI, pk, pki) {
- if (pki != pk.begin()) {
- toDelSql.append(" AND ");
- }
- toDelSql.appendf(" a.%s = b.%s",
- pki->c_str(), pki->c_str());
- }
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- toDelSql.append(" WHERE ");
- }
- else {
- toDelSql.append(" AND ");
- }
- toDelSql.appendf(" b.%s IS NULL",
- pki->c_str());
- }
- if (where) {
- toDelSql.append(" AND ");
- where->writeSql(toDelSql);
- }
- if (order) {
- toDelSql.append(" ORDER BY ");
- order->writeSql(toDelSql);
- }
- toDelSql.append(")");
- break;
- }
- case BulkDeleteUsingUsingAlias:
- case BulkDeleteUsingUsing:
- {
- if (insteadOfDelete) {
- toDelSql.appendf("UPDATE %s a ",
- dest.c_str());
- }
- else {
- toDelSql.appendf("DELETE FROM %s USING %s a ",
- (db.bulkDeleteStyle() == BulkDeleteUsingUsingAlias ? "a" : dest.c_str()),
- dest.c_str());
- }
- toDelSql.appendf(" LEFT OUTER JOIN %s b ",
- src.c_str());
- foreach (PKI, pk, pki) {
- if (pki != pk.begin()) {
- toDelSql.append(" AND ");
- }
- else {
- toDelSql.append(" ON ");
- }
- toDelSql.appendf(" a.%s = b.%s ",
- pki->c_str(), pki->c_str());
- }
- if (insteadOfDelete) {
- insteadOfDelete->writeSql(toDelSql);
- }
- foreach (PKI, pk, pki) {
- if (pki != pk.begin()) {
- toDelSql.append(" AND ");
- }
- else {
- toDelSql.append(" WHERE ");
- }
- toDelSql.appendf(" b.%s IS NULL",
- pki->c_str());
- }
- if (where) {
- toDelSql.append(" AND ");
- where->writeSql(toDelSql);
- }
- if (order) {
- toDelSql.append(" ORDER BY ");
- order->writeSql(toDelSql);
- }
- break;
- }
- }
- ModifyCommand * del = db.newModifyCommand(toDelSql);
- unsigned int offset = 0;
- if (where) {
- where->bindParams(ec, del, offset);
- }
- if (order) {
- order->bindParams(ec, del, offset);
- }
- del->execute();
- delete del;
-}
-
-void
-TablePatch::doUpdates(ExecContext * ec, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order)
-{
- if (cols.size() == pk.size()) {
- // Can't "change" anything... it's all part of the key
- return;
- }
- switch (db.bulkUpdateStyle()) {
- case BulkUpdateByIteration:
- {
- // -----------------------------------------------------------------
- // Build SQL for list of updates to perform ------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer toUpdSel;
- toUpdSel.append("SELECT ");
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- toUpdSel.appendf("b.%s, ",
- col->c_str());
- }
- }
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) != pk.end()) {
- toUpdSel.appendf("b.%s, ",
- col->c_str());
- }
- }
- toUpdSel.appendf("0 FROM %s a, %s b",
- dest.c_str(), src.c_str());
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- toUpdSel.append(" WHERE ");
- }
- else {
- toUpdSel.append(" AND ");
- }
- toUpdSel.appendf(" a.%s = b.%s",
- pki->c_str(), pki->c_str());
- }
- if (where) {
- toUpdSel.append(" AND ");
- where->writeSql(toUpdSel);
- }
- toUpdSel.append(" AND (");
- bool first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- toUpdSel.append(" OR ");
- }
- first = false;
- toUpdSel.appendf(
- " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
- + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
- col->c_str(), col->c_str(), col->c_str(), col->c_str());
- }
- }
- toUpdSel.append(")");
- if (order) {
- toUpdSel.append(" ORDER BY ");
- order->writeSql(toUpdSel);
- }
- // -----------------------------------------------------------------
- // Build SQL to perform updates ------------------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer updSql;
- updSql.appendf("UPDATE %s SET ",
- dest.c_str());
- first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- updSql.append(", ");
- }
- first = false;
- updSql.appendf(" %s = ?",
- col->c_str());
- }
- }
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- updSql.append(" WHERE ");
- }
- else {
- updSql.append(" AND ");
- }
- updSql.appendf(" %s = ?",
- pki->c_str());
- }
- // -----------------------------------------------------------------
- // Iterator over update list make changes --------------------------
- // -----------------------------------------------------------------
- SelectCommand * toUpd = db.newSelectCommand(toUpdSel);
- unsigned int offset = 0;
- if (where) {
- where->bindParams(ec, toUpd, offset);
- }
- if (order) {
- order->bindParams(ec, toUpd, offset);
- }
- ModifyCommand * upd = db.newModifyCommand(updSql);
- int cs = cols.size();
- toUpd->execute();
- for (int c = 0; c < cs; c += 1) {
- (*toUpd)[c].rebind(upd, c);
- }
- while (toUpd->fetch()) {
- upd->execute(false);
- }
- delete toUpd;
- delete upd;
- }
- break;
- case BulkUpdateUsingFromSrc:
- {
- // -----------------------------------------------------------------
- // Build SQL for list of updates to perform ------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer updSql;
- updSql.appendf("UPDATE %s a SET ",
- dest.c_str());
- bool first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- updSql.append(", ");
- }
- first = false;
- updSql.appendf(" %s = b.%s ",
- col->c_str(), col->c_str());
- }
- }
- updSql.appendf(" FROM %s b ",
- src.c_str());
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- updSql.append(" WHERE ");
- }
- else {
- updSql.append(" AND ");
- }
- updSql.appendf(" a.%s = b.%s ",
- pki->c_str(), pki->c_str());
- }
- updSql.append(" AND (");
- first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- updSql.append(" OR ");
- }
- first = false;
- updSql.appendf(
- " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
- + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
- col->c_str(), col->c_str(),
- col->c_str(), col->c_str());
- }
- }
- updSql.append(")");
- if (where) {
- updSql.append(" AND ");
- where->writeSql(updSql);
- }
- if (order) {
- updSql.append(" ORDER BY ");
- order->writeSql(updSql);
- }
- // -----------------------------------------------------------------
- // Execute the bulk update command ---------------------------------
- // -----------------------------------------------------------------
- ModifyCommand * upd = db.newModifyCommand(updSql);
- unsigned int offset = 0;
- if (where) {
- where->bindParams(ec, upd, offset);
- }
- if (order) {
- order->bindParams(ec, upd, offset);
- }
- upd->execute(true);
- delete upd;
- break;
- }
- case BulkUpdateUsingJoin:
- {
- // -----------------------------------------------------------------
- // Build SQL for list of updates to perform ------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer updSql;
- updSql.appendf("UPDATE %s a, %s b SET ",
- dest.c_str(), src.c_str());
- bool first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- updSql.append(", ");
- }
- first = false;
- updSql.appendf(" a.%s = b.%s ",
- col->c_str(), col->c_str());
- }
- }
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- updSql.append(" WHERE ");
- }
- else {
- updSql.append(" AND ");
- }
- updSql.appendf(" a.%s = b.%s ",
- pki->c_str(), pki->c_str());
- }
- updSql.append(" AND (");
- first = true;
- foreach (Columns::const_iterator, cols, col) {
- if (pk.find(*col) == pk.end()) {
- if (!first) {
- updSql.append(" OR ");
- }
- first = false;
- updSql.appendf(
- " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
- + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
- col->c_str(), col->c_str(),
- col->c_str(), col->c_str());
- }
- }
- updSql.append(")");
- if (where) {
- updSql.append(" AND ");
- where->writeSql(updSql);
- }
- if (order) {
- updSql.append(" ORDER BY ");
- order->writeSql(updSql);
- }
- // -----------------------------------------------------------------
- // Execute the bulk update command ---------------------------------
- // -----------------------------------------------------------------
- ModifyCommand * upd = db.newModifyCommand(updSql);
- unsigned int offset = 0;
- if (where) {
- where->bindParams(ec, upd, offset);
- }
- if (order) {
- order->bindParams(ec, upd, offset);
- }
- upd->execute(true);
- delete upd;
- break;
- }
- }
-}
-
-void
-TablePatch::doInserts(ExecContext * ec, DynamicSql::SqlWriterPtr order)
-{
- // -----------------------------------------------------------------
- // Build SQL for copying new records -------------------------------
- // -----------------------------------------------------------------
- AdHoc::Buffer toInsSql;
- toInsSql.appendf("INSERT INTO %s",
- dest.c_str());
- foreach (Columns::const_iterator, cols, col) {
- if (col == cols.begin()) {
- toInsSql.append("(");
- }
- else {
- toInsSql.append(", ");
- }
- toInsSql.appendf("%s",
- col->c_str());
- }
- toInsSql.append(") SELECT ");
- foreach (Columns::const_iterator, cols, col) {
- if (col != cols.begin()) {
- toInsSql.append(", ");
- }
- toInsSql.appendf("b.%s",
- col->c_str());
- }
- toInsSql.appendf(" FROM %s b LEFT OUTER JOIN %s a",
- src.c_str(), dest.c_str());
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- toInsSql.append(" ON ");
- }
- else {
- toInsSql.append(" AND ");
- }
- toInsSql.appendf(" a.%s = b.%s",
- pki->c_str(), pki->c_str());
- }
- foreach (PKI, pk, pki) {
- if (pki == pk.begin()) {
- toInsSql.append(" WHERE ");
- }
- else {
- toInsSql.append(" AND ");
- }
- toInsSql.appendf(" a.%s IS NULL",
- pki->c_str());
- }
- if (order) {
- toInsSql.appendf(" ORDER BY ");
- order->writeSql(toInsSql);
- }
- ModifyCommand * ins = db.newModifyCommand(toInsSql);
- unsigned int offset = 0;
- if (order) {
- order->bindParams(ec, ins, offset);
- }
- ins->execute();
- delete ins;
-}
-
-const char *
-TablePatch::PatchCheckFailure::what() const throw()
-{
- return "Santiy checks failed: check table names and keys";
-}
-
diff --git a/project2/sql/tablepatch.h b/project2/sql/tablepatch.h
deleted file mode 100644
index cef90a7..0000000
--- a/project2/sql/tablepatch.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef TABLEPATCH_H
-#define TABLEPATCH_H
-
-#include <string>
-#include <set>
-#include <map>
-#include <connection.h>
-#include <modifycommand.h>
-#include <selectcommand.h>
-#include <buffer.h>
-#include "sqlWriters.h"
-
-#define foreach(_type, _con, _it) for (_type _it = ((_con).begin()); _it != ((_con).end()); _it++)
-
-class TablePatch {
- public:
- typedef std::string Table;
- typedef std::string Column;
- typedef std::set<Column> Columns;
- typedef Columns PrimaryKey;
- typedef PrimaryKey::const_iterator PKI;
-
- class PatchCheckFailure : public std::exception {
- public:
- const char * what() const throw();
- };
-
- TablePatch(const DB::Connection & db, const Table & src, const Table & dest, const Columns & cols);
-
- void addKey(const Column & col);
- void patch(ExecContext *, DynamicSql::SqlWriterPtr insteadOfDelete, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order);
-
- bool doDelete;
- bool doUpdate;
- bool doInsert;
-
- private:
- void doDeletes(ExecContext *, DynamicSql::SqlWriterPtr insteadOfDelete, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order);
- void doUpdates(ExecContext *, DynamicSql::SqlWriterPtr where, DynamicSql::SqlWriterPtr order);
- void doInserts(ExecContext *, DynamicSql::SqlWriterPtr order);
-
- Table src;
- Table dest;
- PrimaryKey pk;
- Columns cols;
- const DB::Connection &db;
-};
-
-#endif
-