summaryrefslogtreecommitdiff
path: root/project2/sql
diff options
context:
space:
mode:
Diffstat (limited to 'project2/sql')
-rw-r--r--project2/sql/Jamfile.jam38
-rw-r--r--project2/sql/connectionLoader.h19
-rw-r--r--project2/sql/rdbmsDataSource.cpp220
-rw-r--r--project2/sql/rdbmsDataSource.h78
-rw-r--r--project2/sql/sql-modODBC.cpp4
-rw-r--r--project2/sql/sql-modPQ.cpp4
-rw-r--r--project2/sql/sqlCache.cpp300
-rw-r--r--project2/sql/sqlCheck.cpp100
-rw-r--r--project2/sql/sqlCheck.h29
-rw-r--r--project2/sql/sqlHandleAsVariableType.cpp19
-rw-r--r--project2/sql/sqlHandleAsVariableType.h18
-rw-r--r--project2/sql/sqlMergeTask.cpp329
-rw-r--r--project2/sql/sqlMergeTask.h79
-rw-r--r--project2/sql/sqlRows.cpp69
-rw-r--r--project2/sql/sqlRows.h40
-rw-r--r--project2/sql/sqlTask.cpp70
-rw-r--r--project2/sql/sqlTask.h36
-rw-r--r--project2/sql/sqlVariableBinder.cpp77
-rw-r--r--project2/sql/sqlVariableBinder.h36
-rw-r--r--project2/sql/sqlWriter.cpp131
-rw-r--r--project2/sql/sqlWriter.h63
-rw-r--r--project2/sql/tablepatch.cpp438
-rw-r--r--project2/sql/tablepatch.h42
23 files changed, 2239 insertions, 0 deletions
diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam
new file mode 100644
index 0000000..b05fd34
--- /dev/null
+++ b/project2/sql/Jamfile.jam
@@ -0,0 +1,38 @@
+alias libxmlpp : : : :
+ <cflags>"`pkg-config --cflags libxml++-2.6`"
+ <linkflags>"`pkg-config --libs libxml++-2.6`" ;
+
+explicit object sql-modODBC ;
+obj sql-modODBC :
+ sql-modODBC.cpp :
+ <library>../../libodbcpp//odbcpp
+ <library>libxmlpp
+ <include>../../libmisc
+ <include>../common
+ : :
+ <library>../../libodbcpp//odbcpp
+ ;
+
+explicit object sql-modPQ ;
+obj sql-modPQ :
+ sql-modPQ.cpp :
+ <library>../../libpqpp//pqpp
+ <library>libxmlpp
+ <include>../../libmisc
+ <include>../common
+ : :
+ <library>../../libpqpp//pqpp
+ ;
+
+lib p2sql :
+ sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp
+ sqlHandleAsVariableType.cpp
+ ../../libdbpp//dbpp
+ :
+ <odbc>yes:<library>sql-modODBC
+ <pq>yes:<library>sql-modPQ
+ <library>libxmlpp
+ <library>../common//p2common
+ <include>../../libmisc
+ ;
+
diff --git a/project2/sql/connectionLoader.h b/project2/sql/connectionLoader.h
new file mode 100644
index 0000000..841b425
--- /dev/null
+++ b/project2/sql/connectionLoader.h
@@ -0,0 +1,19 @@
+#include "xmlObjectLoader.h"
+#include "../libdbpp/connection.h"
+
+/// Base class to implement DB connection type modules
+class ConnectionLoader : public ComponentLoader {
+ public:
+ virtual DB::Connection * connect(const std::string & dsn) const = 0;
+};
+
+/// Helper implemention for specific DB types
+template <class DBType>
+class ConnectionLoaderImpl : public ConnectionLoader {
+ public:
+ virtual DB::Connection * connect(const std::string & dsn) const
+ {
+ return new DBType(dsn);
+ }
+};
+
diff --git a/project2/sql/rdbmsDataSource.cpp b/project2/sql/rdbmsDataSource.cpp
new file mode 100644
index 0000000..dac6cb1
--- /dev/null
+++ b/project2/sql/rdbmsDataSource.cpp
@@ -0,0 +1,220 @@
+#include "rdbmsDataSource.h"
+#include "connectionLoader.h"
+#include <libxml++/nodes/textnode.h>
+#include <sys/utsname.h>
+#include "logger.h"
+#include <errno.h>
+#include <boost/foreach.hpp>
+
+SimpleMessageException(UnknownConnectionProvider);
+
+/// Specialized ElementLoader for instances of RdbmsDataSource; handles persistent DB connections
+class RdbmsDataSourceLoader : public ElementLoaderImpl<RdbmsDataSource> {
+ public:
+ void onIdle()
+ {
+ // Disconnect all cached database connections
+ RdbmsDataSource::dbhosts.clear();
+ }
+ static bool isConnectionExpired(const RdbmsDataSource::DBHosts::value_type & con)
+ {
+ return con.second->isExpired();
+ }
+ void onPeriodic()
+ {
+ // Disconnect expired database connections
+ RdbmsDataSource::DBHosts::iterator i;
+ while ((i = std::find_if(RdbmsDataSource::dbhosts.begin(), RdbmsDataSource::dbhosts.end(), isConnectionExpired)) != RdbmsDataSource::dbhosts.end()) {
+ RdbmsDataSource::dbhosts.erase(i);
+ }
+ }
+ void onIteration()
+ {
+ RdbmsDataSource::changedDSNs.clear();
+ }
+};
+DECLARE_CUSTOM_LOADER("rdbmsdatasource", RdbmsDataSourceLoader);
+
+RdbmsDataSource::DBHosts RdbmsDataSource::dbhosts;
+RdbmsDataSource::FailedHosts RdbmsDataSource::failedhosts;
+RdbmsDataSource::DSNSet RdbmsDataSource::changedDSNs;
+
+RdbmsDataSource::RdbmsDataSource(const xmlpp::Element * p) :
+ DataSource(p),
+ masterDsn(dynamic_cast<const xmlpp::Element *>(p->find("masterdsn").front())),
+ preferLocal(p->get_attribute_value("preferlocal") != "false")
+{
+ BOOST_FOREACH(const xmlpp::Node * node, p->find("readonly/dsn")) {
+ const xmlpp::Element * elem = dynamic_cast<const xmlpp::Element *>(node);
+ if (elem) {
+ roDSNs.insert(ReadonlyDSNs::value_type(elem->get_attribute_value("host"), elem));
+ }
+ }
+}
+
+RdbmsDataSource::~RdbmsDataSource()
+{
+}
+
+void
+RdbmsDataSource::loadComplete(const CommonObjects *)
+{
+}
+
+const DB::Connection &
+RdbmsDataSource::getWritable() const
+{
+ ConnectionPtr master = connectTo(masterDsn);
+ if (!master->txOpen) {
+ master->connection->beginTx();
+ master->txOpen = true;
+ }
+ changedDSNs.insert(name);
+ return *master->connection;
+}
+
+const DB::Connection &
+RdbmsDataSource::getReadonly() const
+{
+ if (changedDSNs.find(name) != changedDSNs.end()) {
+ return *connectTo(masterDsn)->connection;
+ }
+ if (localhost.length() == 0 && preferLocal) {
+ struct utsname name;
+ if (uname(&name)) {
+ Logger()->messagef(LOG_WARNING, "%s: Unable to determine local host name (%d:%s)",
+ __PRETTY_FUNCTION__, errno, strerror(errno));
+ localhost = "unknown";
+ }
+ else {
+ localhost = name.nodename;
+ }
+ }
+ if (preferLocal) {
+ ReadonlyDSNs::const_iterator ro = roDSNs.find(localhost);
+ try {
+ if (ro == roDSNs.end()) {
+ Logger()->messagef(LOG_INFO, "%s: No database host matches local host name (%s) Will use master DSN",
+ __PRETTY_FUNCTION__, localhost.c_str());
+ return *connectTo(masterDsn)->connection;
+ }
+ return *connectTo(ro->second)->connection;
+ }
+ catch (...) {
+ // Failed to connect to a preferred DB... carry on and try the others...
+ }
+ }
+ BOOST_FOREACH(ReadonlyDSNs::value_type db, roDSNs) {
+ try {
+ return *connectTo(db.second)->connection;
+ }
+ catch (...) {
+ }
+ }
+ return *connectTo(masterDsn)->connection;
+}
+
+void
+RdbmsDataSource::commit()
+{
+ DBHosts::const_iterator m = dbhosts.find(masterDsn);
+ if (m != dbhosts.end() && m->second->txOpen) {
+ m->second->connection->commitTx();
+ m->second->txOpen = false;
+ }
+}
+
+void
+RdbmsDataSource::rollback()
+{
+ DBHosts::const_iterator m = dbhosts.find(masterDsn);
+ if (m != dbhosts.end() && m->second->txOpen) {
+ m->second->connection->rollbackTx();
+ m->second->txOpen = false;
+ }
+ changedDSNs.erase(name);
+}
+
+RdbmsDataSource::ConnectionPtr
+RdbmsDataSource::connectTo(const ConnectionInfo & dsn)
+{
+ FailedHosts::iterator dbf = failedhosts.find(dsn);
+ if (dbf != failedhosts.end()) {
+ if (time(NULL) - 20 > dbf->second.FailureTime) {
+ failedhosts.erase(dbf);
+ }
+ else {
+ throw dbf->second;
+ }
+ }
+ DBHosts::const_iterator dbi = dbhosts.find(dsn);
+ if (dbi != dbhosts.end()) {
+ try {
+ dbi->second->connection->ping();
+ dbi->second->touch();
+ return dbi->second;
+ }
+ catch (...) {
+ // Connection in failed state
+ Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__);
+ }
+ }
+
+ try {
+ ConnectionPtr db = ConnectionPtr(new RdbmsConnection(dsn.connect(), 300));
+ dbhosts[dsn] = db;
+ db->touch();
+ return db;
+ }
+ catch (const DB::ConnectionError & e) {
+ failedhosts.insert(FailedHosts::value_type(dsn, e));
+ throw;
+ }
+}
+
+RdbmsDataSource::RdbmsConnection::RdbmsConnection(const DB::Connection * con, time_t kat) :
+ connection(con),
+ txOpen(false),
+ lastUsedTime(0),
+ keepAliveTime(kat)
+{
+}
+
+RdbmsDataSource::RdbmsConnection::~RdbmsConnection()
+{
+ connection->finish();
+ delete connection;
+}
+
+void
+RdbmsDataSource::RdbmsConnection::touch() const
+{
+ time(&lastUsedTime);
+}
+
+bool
+RdbmsDataSource::RdbmsConnection::isExpired() const
+{
+ return (time(NULL) > lastUsedTime + keepAliveTime);
+}
+
+RdbmsDataSource::ConnectionInfo::ConnectionInfo(const xmlpp::Element * n)
+{
+ BOOST_FOREACH(const xmlpp::Node * node, n->get_children()) {
+ typeId = LoaderBase::getLoader<ConnectionLoader, UnknownConnectionProvider>(node->get_name());
+ dsn = dynamic_cast<const xmlpp::Element *>(node)->get_child_text()->get_content();
+ }
+}
+
+DB::Connection *
+RdbmsDataSource::ConnectionInfo::connect() const
+{
+ return typeId->connect(dsn);
+}
+
+bool
+RdbmsDataSource::ConnectionInfo::operator<(const RdbmsDataSource::ConnectionInfo & other) const
+{
+ return ((typeId < other.typeId) || ((typeId == other.typeId) && (dsn < other.dsn)));
+}
+
diff --git a/project2/sql/rdbmsDataSource.h b/project2/sql/rdbmsDataSource.h
new file mode 100644
index 0000000..fdcfbe7
--- /dev/null
+++ b/project2/sql/rdbmsDataSource.h
@@ -0,0 +1,78 @@
+#ifndef RDBMSDATASOURCE_H
+#define RDBMSDATASOURCE_H
+
+#include <libxml/tree.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <set>
+#include "dataSource.h"
+#include "../libdbpp/connection.h"
+#include "../libdbpp/error.h"
+#include "xmlObjectLoader.h"
+
+class ConnectionLoader;
+
+/// Project2 component to provide access to transactional RDBMS data sources
+class RdbmsDataSource : public DataSource {
+ public:
+ class RdbmsConnection {
+ public:
+ RdbmsConnection(const DB::Connection * connection, time_t kat);
+ ~RdbmsConnection();
+
+ void touch() const;
+ bool isExpired() const;
+ const DB::Connection * const connection;
+ bool txOpen;
+
+ private:
+ mutable time_t lastUsedTime;
+ const time_t keepAliveTime;
+ };
+
+ class ConnectionInfo {
+ public:
+ ConnectionInfo(const xmlpp::Element *);
+
+ DB::Connection * connect() const;
+
+ bool operator<(const ConnectionInfo & o) const;
+
+ private:
+ std::string dsn;
+ boost::shared_ptr<ConnectionLoader> typeId;
+ };
+
+ typedef boost::shared_ptr<RdbmsConnection> ConnectionPtr;
+ typedef std::map<std::string, ConnectionInfo> ReadonlyDSNs; // Map hostname to DSN string
+ typedef std::map<ConnectionInfo, ConnectionPtr> DBHosts; // Map DSN strings to connections
+ typedef std::map<ConnectionInfo, const DB::ConnectionError> FailedHosts; // Map DSN strings to failures
+
+ RdbmsDataSource(const xmlpp::Element * p);
+ ~RdbmsDataSource();
+
+ const DB::Connection & getReadonly() const;
+ const DB::Connection & getWritable() const;
+ virtual void loadComplete(const CommonObjects *);
+ virtual void commit();
+ virtual void rollback();
+
+ const ConnectionInfo masterDsn;
+ const bool preferLocal;
+
+ protected:
+ static ConnectionPtr connectTo(const ConnectionInfo & dsn);
+ ReadonlyDSNs roDSNs;
+
+ private:
+ mutable std::string localhost;
+ static DBHosts dbhosts;
+ static FailedHosts failedhosts;
+ typedef std::set<std::string> DSNSet;
+ static DSNSet changedDSNs;
+
+ friend class RdbmsDataSourceLoader;
+};
+
+#endif
+
diff --git a/project2/sql/sql-modODBC.cpp b/project2/sql/sql-modODBC.cpp
new file mode 100644
index 0000000..1703cb6
--- /dev/null
+++ b/project2/sql/sql-modODBC.cpp
@@ -0,0 +1,4 @@
+#include "connectionLoader.h"
+#include "../libodbcpp/connection.h"
+typedef ODBC::Connection ODBCConnection;
+DECLARE_COMPONENT_LOADER("odbc", ODBCConnection, ConnectionLoader)
diff --git a/project2/sql/sql-modPQ.cpp b/project2/sql/sql-modPQ.cpp
new file mode 100644
index 0000000..5991754
--- /dev/null
+++ b/project2/sql/sql-modPQ.cpp
@@ -0,0 +1,4 @@
+#include "connectionLoader.h"
+#include "../libpqpp/connection.h"
+typedef PQ::Connection PQConnection;
+DECLARE_COMPONENT_LOADER("postgresql", PQConnection, ConnectionLoader)
diff --git a/project2/sql/sqlCache.cpp b/project2/sql/sqlCache.cpp
new file mode 100644
index 0000000..13bc23d
--- /dev/null
+++ b/project2/sql/sqlCache.cpp
@@ -0,0 +1,300 @@
+#include "cache.h"
+#include "sqlVariableBinder.h"
+#include "sqlHandleAsVariableType.h"
+#include "buffer.h"
+#include "selectcommand.h"
+#include "modifycommand.h"
+#include "column.h"
+#include "commonObjects.h"
+#include "rdbmsDataSource.h"
+#include "logger.h"
+#include "xmlObjectLoader.h"
+#include "iHaveParameters.h"
+#include "rowSet.h"
+#include <boost/foreach.hpp>
+#include <boost/program_options.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+
+typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr;
+
+class SqlCache : public Cache {
+ public:
+ SqlCache(const xmlpp::Element * p) :
+ Cache(p)
+ {
+ }
+
+ void loadComplete(const CommonObjects * co)
+ {
+ db = co->dataSource<RdbmsDataSource>(DataSource);
+ }
+
+ static void appendKeyCols(Buffer * sql, unsigned int * off, const Glib::ustring & col)
+ {
+ if ((*off)++) {
+ sql->append(", ");
+ }
+ sql->append(col.c_str());
+ }
+
+ static void appendKeyBinds(Buffer * sql, unsigned int * off)
+ {
+ if ((*off)++) {
+ sql->append(", ");
+ }
+ sql->append("?");
+ }
+
+ static void appendKeyAnds(Buffer * sql, const Glib::ustring & col)
+ {
+ sql->appendf(" AND h.%s = ?", col.c_str());
+ }
+
+ static void bindKeyValues(DB::Command * cmd, unsigned int * offset, const VariableType & v)
+ {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, (*offset)++), v);
+ }
+
+ class SqlCacheRowSet : public RowSet {
+ public:
+ SqlCacheRowSet(SelectPtr r) :
+ RowSet(NULL),
+ s(r) {
+ }
+ void loadComplete(const CommonObjects *) {
+ }
+ class SqlCacheRowState : public RowState {
+ public:
+ SqlCacheRowState(const SqlCacheRowSet * s) :
+ sc(s) {
+ }
+ const Columns & getColumns() const {
+ return columns;
+ }
+ RowAttribute resolveAttr(const Glib::ustring & attrName) const {
+ return boost::bind(&SqlCacheRowState::getAttrCol, this, "p2attr_" + attrName);
+ }
+ private:
+ VariableType getAttrCol(const Glib::ustring & col) const {
+ HandleAsVariableType h;
+ (*sc->s)[col].apply(h);
+ return h.variable;
+ }
+ mutable Columns columns;
+ friend class SqlCacheRowSet;
+ const SqlCacheRowSet * sc;
+ };
+ void execute(const Glib::ustring&, const RowProcessor * rp) const {
+ SqlCacheRowState ss(this);
+ HandleAsVariableType h;
+ do {
+ if (!(*s)["p2_cacheid"].isNull()) {
+ if (colCols.empty()) {
+ (*s)["p2_cacheid"].apply(h);
+ cacheId = h.variable;
+ unsigned int colNo = 0;
+ for (unsigned int c = 0; c < s->columnCount(); c++) {
+ const DB::Column & col = (*s)[c];
+ if (!boost::algorithm::starts_with(col.name, "p2attr_") &&
+ !boost::algorithm::starts_with(col.name, "p2_")) {
+ ss.columns.insert(new Column(colNo++, col.name));
+ colCols.push_back(c);
+ }
+ }
+ ss.fields.resize(colCols.size());
+ }
+ else {
+ (*s)["p2_cacheid"].apply(h);
+ if (cacheId != (int64_t)h.variable) {
+ break;
+ }
+ }
+ BOOST_FOREACH(const unsigned int & c, colCols) {
+ const DB::Column & col = (*s)[c];
+ col.apply(h);
+ ss.fields[&c - &colCols.front()] = h.variable;
+ }
+ ss.process(rp);
+ }
+ } while (s->fetch());
+ }
+ private:
+ SelectPtr s;
+ mutable std::vector<unsigned int> colCols;
+ mutable int64_t cacheId;
+ };
+
+ RowSetCPtr getCachedRowSet(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const
+ {
+ Buffer sql;
+ sql.appendf("SELECT r.* \
+ FROM %s p, %s_%s_%s h LEFT OUTER JOIN %s_%s_%s_rows r \
+ ON h.p2_cacheid = r.p2_cacheid \
+ WHERE p.p2_time > ? \
+ AND p.p2_cacheid = h.p2_cacheid",
+ HeaderTable.c_str(),
+ HeaderTable.c_str(), n.c_str(), f.c_str(),
+ HeaderTable.c_str(),n.c_str(), f.c_str());
+ applyKeys(boost::bind(appendKeyAnds, &sql, _1), ps);
+ sql.appendf(" ORDER BY r.p2_cacheid DESC, r.p2_row");
+ SelectPtr gh(db->getReadonly().newSelectCommand(sql));
+ unsigned int offset = 0;
+ gh->bindParamT(offset++, time(NULL) - CacheLife);
+ applyKeys(boost::bind(bindKeyValues, gh.get(), &offset, _2), ps);
+ if (gh->fetch()) {
+ return new SqlCacheRowSet(gh);
+ }
+ return NULL;
+ }
+
+ class SqlCachePresenter : public Presenter {
+ public:
+ SqlCachePresenter(const Glib::ustring & name, const Glib::ustring & filter, const RdbmsDataSource * d) :
+ depth(0),
+ row(1),
+ db(d),
+ n(name),
+ f(filter) {
+ }
+ void declareNamespace(const Glib::ustring &, const Glib::ustring &) const { }
+ void setNamespace(const Glib::ustring &, const Glib::ustring &) const { }
+ void pushSub(const Glib::ustring & name, const Glib::ustring &) const {
+ depth += 1;
+ if (depth == 2) {
+ col = name;
+ }
+ else if (depth == 1) {
+ }
+ }
+ void addAttr(const Glib::ustring & name, const Glib::ustring &, const VariableType & value) const {
+ attrs.insert(Values::value_type(name, value));
+ }
+ void addText(const VariableType & value) const {
+ cols.insert(Values::value_type(col, value));
+ }
+ void popSub() const {
+ if (depth == 2) {
+ col.clear();
+ }
+ else if (depth == 1) {
+ Buffer sql;
+ sql.appendf("INSERT INTO %s_%s_%s_rows(p2_row", HeaderTable.c_str(), n.c_str(), f.c_str());
+ BOOST_FOREACH(const Values::value_type & a, attrs) {
+ sql.appendf(", p2attr_%s", a.first.c_str());
+ }
+ BOOST_FOREACH(const Values::value_type & v, cols) {
+ sql.appendf(", %s", v.first.c_str());
+ }
+ sql.appendf(") VALUES(?");
+ for (size_t x = attrs.size(); x > 0; x -= 1) {
+ sql.append(", ?");
+ }
+ for (size_t x = cols.size(); x > 0; x -= 1) {
+ sql.append(", ?");
+ }
+ sql.appendf(")");
+ ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+ unsigned int offset = 0;
+ m->bindParamI(offset++, row++);
+ BOOST_FOREACH(const Values::value_type & a, attrs) {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), a.second);
+ }
+ BOOST_FOREACH(const Values::value_type & v, cols) {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), v.second);
+ }
+ m->execute();
+ cols.clear();
+ attrs.clear();
+ }
+ depth -= 1;
+ }
+ private:
+ mutable unsigned int depth;
+ mutable unsigned int row;
+ const RdbmsDataSource * db;
+ mutable Glib::ustring col;
+ const Glib::ustring n, f;
+ typedef std::map<Glib::ustring, VariableType> Values;
+ mutable Values cols, attrs;
+ };
+
+ PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps)
+ {
+ // Header
+ Buffer del;
+ del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str());
+ ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del));
+ h->bindParamT(0, time(NULL));
+ h->execute();
+ // Record set header
+ Buffer sql;
+ sql.appendf("INSERT INTO %s_%s_%s(", HeaderTable.c_str(), n.c_str(), f.c_str());
+ unsigned int offset = 0;
+ applyKeys(boost::bind(appendKeyCols, &sql, &offset, _1), ps);
+ sql.appendf(") VALUES(");
+ offset = 0;
+ applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps);
+ sql.appendf(")");
+ ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+ offset = 0;
+ applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps);
+ m->execute();
+ return new SqlCachePresenter(n, f, db);
+ }
+
+ void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * )
+ {
+ }
+
+ private:
+ friend class CustomSqlCacheLoader;
+ const RdbmsDataSource * db;
+ static std::string DataSource;
+ static std::string HeaderTable;
+ static time_t CacheLife;
+};
+
+std::string SqlCache::DataSource;
+std::string SqlCache::HeaderTable;
+time_t SqlCache::CacheLife;
+
+namespace po = boost::program_options;
+class CustomSqlCacheLoader : public ElementLoaderImpl<SqlCache> {
+ public:
+ CustomSqlCacheLoader() :
+ opts("SQL Cache options")
+ {
+ opts.add_options()
+ ("cache.sql.datasource", po::value(&SqlCache::DataSource),
+ "The default datasource to connect to")
+ ("cache.sql.headertable", po::value(&SqlCache::HeaderTable)->default_value("p2cache"),
+ "The filename to store the data in")
+ ("cache.sql.life", po::value(&SqlCache::CacheLife)->default_value(3600),
+ "The age of cache entries after which they are removed (seconds)")
+ ;
+ }
+
+ po::options_description * options()
+ {
+ return &opts;
+ }
+
+ void onIdle()
+ {
+ if (!SqlCache::DataSource.empty()) {
+ boost::intrusive_ptr<CommonObjects> co = new CommonObjects();
+ const RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource);
+ Buffer del;
+ del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str());
+ ModifyPtr m(db->getReadonly().newModifyCommand(del));
+ m->bindParamT(0, time(NULL) - SqlCache::CacheLife);
+ m->execute();
+ }
+ }
+
+ private:
+ po::options_description opts;
+};
+DECLARE_CUSTOM_LOADER("sqlcache", CustomSqlCacheLoader);
+
diff --git a/project2/sql/sqlCheck.cpp b/project2/sql/sqlCheck.cpp
new file mode 100644
index 0000000..d86eb3b
--- /dev/null
+++ b/project2/sql/sqlCheck.cpp
@@ -0,0 +1,100 @@
+#include "sqlCheck.h"
+#include "xmlObjectLoader.h"
+#include "selectcommand.h"
+#include "column.h"
+#include "rdbmsDataSource.h"
+#include "commonObjects.h"
+#include "sqlVariableBinder.h"
+#include <boost/foreach.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+DECLARE_LOADER("sqlcheck", SqlCheck);
+
+class CantCompareNulls : public std::exception { };
+
+SqlCheck::SqlCheck(const xmlpp::Element * p) :
+ ParamChecker(p),
+ dataSource(p, "datasource"),
+ filter(p, "filter", false, ""),
+ testOp(p, "testOp", false, "=="),
+ testValue(p, "testValue"),
+ sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front()))
+{
+}
+
+SqlCheck::~SqlCheck()
+{
+}
+
+void
+SqlCheck::loadComplete(const CommonObjects * co)
+{
+ db = co->dataSource<RdbmsDataSource>(dataSource());
+}
+
+class HandleDoCompare : public DB::HandleField {
+ public:
+ HandleDoCompare(const VariableType & tV, const std::string & tO) :
+ retVal(false),
+ testValue(tV),
+ testOp(tO) {
+ }
+ void null() {
+ throw CantCompareNulls();
+ }
+ void string(const char *c , size_t l) {
+ doTest(Glib::ustring(c, c + l));
+ }
+ void integer(int64_t val) {
+ doTest(val);
+ }
+ void floatingpoint(double val) {
+ doTest(val);
+ }
+ void timestamp(const struct tm & val) {
+ doTest(boost::posix_time::ptime_from_tm(val));
+ }
+ bool operator()() const {
+ return retVal;
+ }
+ private:
+ template <typename TV>
+ void doTest(const TV & val) {
+ TV tv = testValue;
+ if ((testOp == "==" || testOp == "=") && val == tv) {
+ retVal = true;
+ }
+ else if (testOp == "<" && val < tv) {
+ retVal = true;
+ }
+ else if (testOp == ">" && val > tv) {
+ retVal = true;
+ }
+ else if (testOp == "!=" && val != tv) {
+ retVal = true;
+ }
+ else if ((testOp == "<=" || testOp == "=<") && val <= tv) {
+ retVal = true;
+ }
+ else if ((testOp == ">=" || testOp == "=>") && val >= tv) {
+ retVal = true;
+ }
+ }
+ bool retVal;
+ const VariableType & testValue;
+ std::string testOp;
+};
+bool
+SqlCheck::performCheck() const
+{
+ boost::shared_ptr<DB::SelectCommand> query = boost::shared_ptr<DB::SelectCommand>(
+ db->getWritable().newSelectCommand(sqlCommand.getSqlFor(filter())));
+ unsigned int offset = 0;
+ sqlCommand.bindParams(query.get(), offset);
+ HandleDoCompare h(testValue, testOp());
+ while (query->fetch()) {
+ (*query)[0].apply(h);
+ }
+ return h();
+}
+
diff --git a/project2/sql/sqlCheck.h b/project2/sql/sqlCheck.h
new file mode 100644
index 0000000..c9933d7
--- /dev/null
+++ b/project2/sql/sqlCheck.h
@@ -0,0 +1,29 @@
+#ifndef SQLCHECK_H
+#define SQLCHECK_H
+
+#include "paramChecker.h"
+#include "sqlWriter.h"
+
+namespace DB { class SelectCommand; }
+class RdbmsDataSource;
+
+/// Project2 component to check the value of a variable against an RDBMS data source
+class SqlCheck : public ParamChecker {
+ public:
+ SqlCheck(const xmlpp::Element * p);
+ virtual ~SqlCheck();
+
+ virtual void loadComplete(const CommonObjects *);
+ bool performCheck() const;
+
+ const Variable dataSource;
+ const Variable filter;
+ const Variable testOp;
+ const Variable testValue;
+
+ private:
+ const DynamicSql::SqlCommand sqlCommand;
+ const RdbmsDataSource * db;
+};
+
+#endif
diff --git a/project2/sql/sqlHandleAsVariableType.cpp b/project2/sql/sqlHandleAsVariableType.cpp
new file mode 100644
index 0000000..f084a14
--- /dev/null
+++ b/project2/sql/sqlHandleAsVariableType.cpp
@@ -0,0 +1,19 @@
+#include "sqlHandleAsVariableType.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+void HandleAsVariableType::null() {
+ variable = Null();
+}
+void HandleAsVariableType::string(const char * c, size_t l) {
+ variable = Glib::ustring(c, c + l);
+}
+void HandleAsVariableType::integer(int64_t i) {
+ variable = i;
+}
+void HandleAsVariableType::floatingpoint(double d) {
+ variable = d;
+}
+void HandleAsVariableType::timestamp(const struct tm & t) {
+ variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t));
+}
+
diff --git a/project2/sql/sqlHandleAsVariableType.h b/project2/sql/sqlHandleAsVariableType.h
new file mode 100644
index 0000000..c874b7c
--- /dev/null
+++ b/project2/sql/sqlHandleAsVariableType.h
@@ -0,0 +1,18 @@
+#ifndef SQLHANDLEASVARIABLETYPE
+#define SQLHANDLEASVARIABLETYPE
+
+#include "column.h"
+#include "variables.h"
+
+class HandleAsVariableType : public DB::HandleField {
+ public:
+ void null();
+ void string(const char * c, size_t l);
+ void integer(int64_t i);
+ void floatingpoint(double d);
+ void timestamp(const struct tm & t);
+ VariableType variable;
+};
+
+#endif
+
diff --git a/project2/sql/sqlMergeTask.cpp b/project2/sql/sqlMergeTask.cpp
new file mode 100644
index 0000000..09657f8
--- /dev/null
+++ b/project2/sql/sqlMergeTask.cpp
@@ -0,0 +1,329 @@
+#include "sqlMergeTask.h"
+#include "columns.h"
+#include "commonObjects.h"
+#include "rdbmsDataSource.h"
+#include "exceptions.h"
+#include "sqlVariableBinder.h"
+#include "xmlObjectLoader.h"
+#include <misc.h>
+#include <stdio.h>
+#include <stdexcept>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
+#include <libxml++/nodes/textnode.h>
+
+bool SqlMergeTask::defaultUseTempTable = true;
+static void attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert);
+
+class SqlMergeInsert;
+typedef boost::intrusive_ptr<SqlMergeInsert> SqlMergeInsertPtr;
+/// Project2 component insert custom constructed records during an SQL Merge task
+class SqlMergeInsert : IHaveParameters, public Task {
+ public:
+ SqlMergeInsert(const xmlpp::Element * p) :
+ SourceObject(p),
+ IHaveParameters(p),
+ Task(p) {
+ }
+ void loadComplete(const CommonObjects*) {
+ }
+ void execute() const {
+ unsigned int col = 0;
+ BOOST_FOREACH(const Parameters::value_type & v, parameters) {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(insert, col++), v.second);
+ }
+ insert->execute();
+ }
+ private:
+ friend void attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert);
+ DB::ModifyCommand * insert;
+};
+
+DECLARE_LOADER("sqlmerge", SqlMergeTask);
+DECLARE_LOADER("sqlmergeinsert", SqlMergeInsert);
+
+// Conversion logic
+SqlMergeTask::SqlMergeTask(const xmlpp::Element * p) :
+ SourceObject(p),
+ Task(p),
+ updateWhere(p, "updatewhere", false),
+ patchOrder(p->get_attribute_value("patchorder")),
+ earlyKeys(p->get_attribute_value("earlykeys") == "yes"),
+ useView(p->get_attribute_value("useview") == "yes"),
+ tempTableCreated(false),
+ insCmd(NULL),
+ destdb(NULL),
+ dataSource(p, "datasource"),
+ dtable(p->get_attribute_value("targettable")),
+ dtablet(stringf("tmp_%s_%d", dtable.c_str(), getpid()))
+{
+ LoaderBase loader(true);
+ loader.supportedStorers.insert(Storer::into(&sources));
+ loader.collectAll(p, true);
+
+ if (!sources.empty() && useView) {
+ throw NotSupported("useview not supported with iterate fillers");
+ }
+
+ BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column")) {
+ xmlpp::Element * e = static_cast<xmlpp::Element *>(psi);
+ TargetColumnPtr tcp(new TargetColumn(e->get_child_text()->get_content()));
+ tcp->maptable = e->get_attribute_value("maptable");
+ if (!tcp->maptable.empty()) {
+ if (useView) {
+ throw NotSupported("useview not supported with mapped columns");
+ }
+ tcp->mapcolumn = e->get_attribute_value("mapcolumn");
+ }
+ cols.insert(tcp);
+ }
+ BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column[@key='true']")) {
+ keys.insert(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content());
+ }
+ BOOST_FOREACH(xmlpp::Node * psi, p->find("sql")) {
+ sqls.push_back(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content());
+ }
+}
+
+SqlMergeTask::~SqlMergeTask()
+{
+ delete insCmd;
+}
+
+void
+SqlMergeTask::loadComplete(const CommonObjects * co)
+{
+ destdb = &co->dataSource<RdbmsDataSource>(dataSource())->getWritable();
+ insCmd = insertCommand();
+ BOOST_FOREACH(const Sources::value_type & i, sources) {
+ attach(i, insCmd);
+ }
+}
+
+SqlMergeTask::TargetColumn::TargetColumn(const Column & c) :
+ column(c),
+ mapcolumn(c)
+{
+}
+
+bool
+SqlMergeTask::TargetColumn::Sort::operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const
+{
+ return a->column < b->column;
+}
+
+void
+SqlMergeTask::execute() const
+{
+ createTempTable();
+ if (earlyKeys) {
+ createTempKey();
+ copyToTempTable();
+ }
+ else {
+ copyToTempTable();
+ createTempKey();
+ }
+ std::set<std::string> colNames;
+ BOOST_FOREACH(const TargetColumnPtr & c, cols) {
+ colNames.insert(c->column);
+ }
+ TablePatch tp(*destdb, dtablet, dtable, colNames);
+ BOOST_FOREACH(const Keys::value_type & k, keys) {
+ tp.addKey(k);
+ }
+ tp.patch(updateWhere(), patchOrder.c_str());
+ dropTempTable();
+}
+
+void
+SqlMergeTask::createTempTable() const
+{
+ if (useView) {
+ DB::ModifyCommand * cv = destdb->newModifyCommand(stringf(
+ "CREATE VIEW %s AS %s",
+ dtablet.c_str(),
+ boost::algorithm::join(sqls, " UNION ").c_str()));
+ cv->execute();
+ delete cv;
+ }
+ else {
+ DB::ModifyCommand * ctt = destdb->newModifyCommand(stringf(
+ "CREATE TEMPORARY TABLE %s AS SELECT * FROM %s WHERE 0=1",
+ dtablet.c_str(),
+ dtable.c_str()));
+ ctt->execute();
+ delete ctt;
+ BOOST_FOREACH(Columns::value_type c, cols) {
+ if (!c->maptable.empty()) {
+ DB::ModifyCommand * at = destdb->newModifyCommand(stringf(
+ "ALTER TABLE %s ADD COLUMN %s VARCHAR(1000)",
+ dtablet.c_str(),
+ c->mapcolumn.c_str()));
+ at->execute();
+ delete at;
+ }
+ }
+ }
+ tempTableCreated = true;
+}
+void
+SqlMergeTask::dropTempTable() const
+{
+ if (tempTableCreated) {
+ DB::ModifyCommand * d;
+ if (useView) {
+ d = destdb->newModifyCommand("DROP VIEW " + dtablet);
+ }
+ else {
+ d = destdb->newModifyCommand("DROP TABLE " + dtablet);
+ }
+ d->execute();
+ delete d;
+ }
+}
+void
+SqlMergeTask::createTempKey() const
+{
+ if (useView) return;
+ /* Primary key */
+ Buffer idx;
+ idx.appendf("ALTER TABLE %s ADD CONSTRAINT pk_%s PRIMARY KEY(%s)",
+ dtablet.c_str(), dtablet.c_str(),
+ boost::algorithm::join(keys, ", ").c_str());
+ DB::ModifyCommand * at = destdb->newModifyCommand(idx);
+ at->execute();
+ delete at;
+ /* Indexes */
+ int n = 0;
+ BOOST_FOREACH(const Keys::value_type & i, indexes) {
+ DB::ModifyCommand * ci = destdb->newModifyCommand(stringf(
+ "CREATE INDEX idx_%s_%d ON %s(%s)",
+ dtablet.c_str(), n, dtablet.c_str(), i.c_str()));
+ ci->execute();
+ delete ci;
+ n += 1;
+ }
+}
+DB::ModifyCommand *
+SqlMergeTask::insertCommand() const
+{
+ Buffer ins;
+ ins.appendf("INSERT INTO %s(", dtablet.c_str());
+ foreach(Columns::const_iterator, cols, c) {
+ if (c != cols.begin()) {
+ ins.append(", ");
+ }
+ ins.append((*c)->mapcolumn);
+ }
+ ins.append(") VALUES (");
+ foreach(Columns::const_iterator, cols, c) {
+ if (c == cols.begin()) {
+ ins.append("?");
+ }
+ else {
+ ins.append(", ?");
+ }
+ }
+ ins.append(")");
+ return destdb->newModifyCommand(ins);
+}
+
+class Populate : public NoOutputExecute {
+ public:
+ Populate(DB::ModifyCommand * c) :
+ SourceObject(__FUNCTION__),
+ NoOutputExecute(__FUNCTION__),
+ cmd(c)
+ {
+ }
+ virtual void loadComplete(const CommonObjects *)
+ {
+ }
+ void execute() const
+ {
+ unsigned int idx = 0;
+ RowState::Stack().back()->foreachColumn(boost::bind(&Populate::bind, this, boost::ref(idx), _3));
+ cmd->execute();
+ }
+ private:
+ void bind(unsigned int & idx, const VariableType & value) const
+ {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, idx++), value);
+ }
+ DB::ModifyCommand * cmd;
+};
+typedef boost::intrusive_ptr<Populate> PopulatePtr;
+
+void
+attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert)
+{
+ if (i) {
+ i->insert = insert;
+ }
+}
+
+static void
+attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert)
+{
+ if (!i) {
+ return;
+ }
+ if (i->normal.empty()) {
+ i->normal.push_back(new Populate(insert));
+ }
+ else {
+ BOOST_FOREACH(const IHaveSubTasks::Tasks::value_type & n, i->normal) {
+ attach(boost::dynamic_pointer_cast<IHaveSubTasks>(n), insert);
+ attach(boost::dynamic_pointer_cast<SqlMergeInsert>(n), insert);
+ }
+ }
+}
+
+void
+SqlMergeTask::copyToTempTable() const
+{
+ if (useView) return;
+ BOOST_FOREACH(const Sources::value_type & i, sources) {
+ i->execute();
+ }
+ BOOST_FOREACH(const std::string & sql, sqls) {
+ Buffer ins;
+ ins.appendf("INSERT INTO %s(", dtablet.c_str());
+ foreach(Columns::const_iterator, cols, c) {
+ if (c != cols.begin()) {
+ ins.append(", ");
+ }
+ ins.append((*c)->column);
+ }
+ ins.append(") SELECT ");
+ foreach(Columns::const_iterator, cols, c) {
+ if (c != cols.begin()) {
+ ins.append(", ");
+ }
+ ins.append((*c)->column);
+ }
+ ins.appendf(" FROM (%s) tmp_src", sql.c_str());
+ DB::ModifyCommand * cttt = destdb->newModifyCommand(ins);
+ cttt->execute();
+ delete cttt;
+ }
+ BOOST_FOREACH(Columns::value_type c, cols) {
+ if (!c->maptable.empty()) {
+ DB::ModifyCommand * utt = destdb->newModifyCommand(
+ stringf(
+ "UPDATE %s d SET %s = (SELECT m.%s FROM %s m WHERE m.%s = d.%s) WHERE %s IS NULL",
+ dtablet.c_str(),
+ c->column.c_str(),
+ c->column.c_str(),
+ c->maptable.c_str(),
+ c->mapcolumn.c_str(),
+ c->mapcolumn.c_str(),
+ c->column.c_str()));
+ utt->execute();
+ delete utt;
+ }
+ }
+}
+
diff --git a/project2/sql/sqlMergeTask.h b/project2/sql/sqlMergeTask.h
new file mode 100644
index 0000000..c9e206c
--- /dev/null
+++ b/project2/sql/sqlMergeTask.h
@@ -0,0 +1,79 @@
+#ifndef SQLMERGETASK_H
+#define SQLMERGETASK_H
+
+#include <connection.h>
+#include <modifycommand.h>
+#include <selectcommand.h>
+#include <buffer.h>
+#include "tablepatch.h"
+#include "task.h"
+#include "iterate.h"
+#include "variables.h"
+#include <string>
+#include <set>
+#include <map>
+#include <list>
+
+/// Project2 component merge arbitrary data into an RDBMS table
+class SqlMergeTask : public Task {
+ public:
+ typedef std::string Table;
+ typedef std::string Column;
+ class TargetColumn;
+ typedef boost::intrusive_ptr<TargetColumn> TargetColumnPtr;
+ class TargetColumn : public virtual IntrusivePtrBase {
+ public:
+ class Sort {
+ public:
+ bool operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const;
+ };
+ TargetColumn(const Column &);
+
+ Column column;
+ Column mapcolumn;
+ Table maptable;
+
+ };
+ typedef std::set<TargetColumnPtr, TargetColumn::Sort> Columns;
+
+ typedef std::set<Column> Keys;
+
+ SqlMergeTask(const xmlpp::Element * p);
+ virtual ~SqlMergeTask();
+
+ virtual void loadComplete(const CommonObjects *);
+ void execute() const;
+ Columns cols;
+ Keys keys;
+ Keys indexes;
+ const Variable updateWhere;
+ const std::string patchOrder;
+ const bool earlyKeys;
+ const bool useView;
+
+ private:
+ virtual void copyToTempTable() const;
+ void createTempTable() const;
+ void dropTempTable() const;
+ void createTempKey() const;
+
+ mutable bool tempTableCreated;
+ typedef ANONSTORAGEOF(Iterate) Sources;
+ Sources sources;
+ std::list<std::string> sqls;
+ protected:
+ DB::ModifyCommand * insertCommand() const;
+ DB::ModifyCommand * insCmd;
+
+ public:
+ const DB::Connection * destdb;
+ const Variable dataSource;
+ const Table dtable;
+ const Table dtablet;
+
+ static unsigned int defaultVerbosity;
+ static bool defaultUseTempTable;
+};
+
+#endif
+
diff --git a/project2/sql/sqlRows.cpp b/project2/sql/sqlRows.cpp
new file mode 100644
index 0000000..6e506d7
--- /dev/null
+++ b/project2/sql/sqlRows.cpp
@@ -0,0 +1,69 @@
+#include "sqlRows.h"
+#include "sqlHandleAsVariableType.h"
+#include "rowProcessor.h"
+#include "xml.h"
+#include "selectcommand.h"
+#include "rdbmsDataSource.h"
+#include "column.h"
+#include <string.h>
+#include "xmlObjectLoader.h"
+#include "commonObjects.h"
+#include <boost/date_time/gregorian/gregorian_types.hpp>
+#include <boost/foreach.hpp>
+
+DECLARE_LOADER("sqlrows", SqlRows);
+
+SqlRows::SqlRows(const xmlpp::Element * p) :
+ RowSet(p),
+ dataSource(p, "datasource"),
+ sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())),
+ db(NULL)
+{
+}
+
+SqlRows::~SqlRows()
+{
+}
+
+void
+SqlRows::loadComplete(const CommonObjects * co)
+{
+ db = co->dataSource<RdbmsDataSource>(dataSource());
+}
+
+SqlRows::SqlState::SqlState(SelectPtr s) :
+ query(s)
+{
+}
+
+const Columns &
+SqlRows::SqlState::getColumns() const
+{
+ if (columns.empty()) {
+ for (unsigned int c = 0; c < query->columnCount(); c++) {
+ columns.insert(new Column(c, (*query)[c].name));
+ }
+ }
+ return columns;
+}
+
+void
+SqlRows::execute(const Glib::ustring & filter, const RowProcessor * rp) const
+{
+ unsigned int offset = 0;
+ SqlState ss(SelectPtr(db->getReadonly().newSelectCommand(sqlCommand.getSqlFor(filter))));
+ sqlCommand.bindParams(ss.query.get(), offset);
+ while (ss.query->fetch()) {
+ HandleAsVariableType h;
+ if (ss.fields.empty()) {
+ ss.fields.resize(ss.query->columnCount());
+ }
+ for (unsigned int c = 0; c < ss.query->columnCount(); c++) {
+ const DB::Column & col = (*ss.query)[c];
+ col.apply(h);
+ ss.fields[c] = h.variable;
+ }
+ ss.process(rp);
+ }
+}
+
diff --git a/project2/sql/sqlRows.h b/project2/sql/sqlRows.h
new file mode 100644
index 0000000..5614fa1
--- /dev/null
+++ b/project2/sql/sqlRows.h
@@ -0,0 +1,40 @@
+#ifndef SQLROWS_H
+#define SQLROWS_H
+
+#include <libxml++/nodes/element.h>
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+#include "selectcommand.h"
+#include "iHaveParameters.h"
+#include "rowSet.h"
+#include "sqlWriter.h"
+
+class RdbmsDataSource;
+
+/// Project2 component to create a row set based on an SQL SELECT statement issued against an RDBMS data source
+class SqlRows : public RowSet {
+ public:
+ SqlRows(const xmlpp::Element * p);
+ ~SqlRows();
+
+ void execute(const Glib::ustring &, const RowProcessor *) const;
+ virtual void loadComplete(const CommonObjects *);
+
+ const Variable dataSource;
+
+ private:
+ const DynamicSql::SqlCommand sqlCommand;
+ typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+ class SqlState : public RowState {
+ public:
+ SqlState(SelectPtr query);
+ const Columns & getColumns() const;
+ SelectPtr query;
+ mutable Columns columns;
+ friend class SqlRows;
+ };
+ const RdbmsDataSource * db;
+};
+
+#endif
+
diff --git a/project2/sql/sqlTask.cpp b/project2/sql/sqlTask.cpp
new file mode 100644
index 0000000..3a1d334
--- /dev/null
+++ b/project2/sql/sqlTask.cpp
@@ -0,0 +1,70 @@
+#include "sqlTask.h"
+#include <boost/foreach.hpp>
+#include "xmlObjectLoader.h"
+#include "modifycommand.h"
+#include "rdbmsDataSource.h"
+#include "commonObjects.h"
+#include "sqlVariableBinder.h"
+
+DECLARE_LOADER("sqltask", SqlTask);
+StaticMessageException(RunOnNotSpecified, "runon attribute must be specified");
+
+class SqlIfChangesStorer : public StorerBase<NoOutputExecute, ANONORDEREDSTORAGEOF(NoOutputExecute)> {
+ public:
+ SqlIfChangesStorer(Map c, Map nc) :
+ changes(c),
+ noChanges(nc)
+ {
+ }
+ bool insert(const xmlpp::Element * p, NoOutputExecutePtr O) {
+ xmlpp::Attribute * runon = p->get_attribute("runon");
+ if (!runon) {
+ throw RunOnNotSpecified();
+ }
+ ((runon->get_value() == "changes") ? changes : noChanges)->push_back(O);
+ return true;
+ }
+ Map changes, noChanges;
+};
+
+SqlTask::SqlTask(const xmlpp::Element * p) :
+ SourceObject(p),
+ Task(p),
+ dataSource(p, "datasource"),
+ filter(p, "filter", false, ""),
+ sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front()))
+{
+ LoaderBase loader(true);
+ loader.supportedStorers.insert(new SqlIfChangesStorer(&changesNOEs, &noChangesNOEs));
+ loader.collectAll(p, true, IgnoreUnsupported);
+}
+
+SqlTask::~SqlTask()
+{
+}
+
+void
+SqlTask::loadComplete(const CommonObjects * co)
+{
+ db = co->dataSource<RdbmsDataSource>(dataSource());
+}
+
+void
+SqlTask::execute() const
+{
+ boost::shared_ptr<DB::ModifyCommand> modify = boost::shared_ptr<DB::ModifyCommand>(
+ db->getWritable().newModifyCommand(sqlCommand.getSqlFor(filter())));
+ unsigned int offset = 0;
+ sqlCommand.bindParams(modify.get(), offset);
+ if (modify->execute() == 0) {
+ BOOST_FOREACH(const SubNOEs::value_type & sq, noChangesNOEs) {
+ sq->execute();
+ }
+ }
+ else {
+ BOOST_FOREACH(const SubNOEs::value_type & sq, changesNOEs) {
+ sq->execute();
+ }
+ }
+}
+
diff --git a/project2/sql/sqlTask.h b/project2/sql/sqlTask.h
new file mode 100644
index 0000000..384b000
--- /dev/null
+++ b/project2/sql/sqlTask.h
@@ -0,0 +1,36 @@
+#ifndef SQLTASK_H
+#define SQLTASK_H
+
+#include <libxml++/nodes/element.h>
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+#include "task.h"
+#include "variables.h"
+#include "sqlWriter.h"
+
+namespace DB { class ModifyCommand; }
+class RdbmsDataSource;
+
+/// Project2 component to execute a modifying SQL statement against an RDBMS data source
+class SqlTask : public Task {
+ public:
+ SqlTask(const xmlpp::Element * p);
+ virtual ~SqlTask();
+ virtual void loadComplete(const CommonObjects *);
+ virtual void execute() const;
+
+ const Variable dataSource;
+ const Variable filter;
+
+ typedef ANONORDEREDSTORAGEOF(NoOutputExecute) SubNOEs;
+ SubNOEs changesNOEs;
+ SubNOEs noChangesNOEs;
+
+ protected:
+ const DynamicSql::SqlCommand sqlCommand;
+ const RdbmsDataSource * db;
+};
+
+#endif
+
+
diff --git a/project2/sql/sqlVariableBinder.cpp b/project2/sql/sqlVariableBinder.cpp
new file mode 100644
index 0000000..23c24f2
--- /dev/null
+++ b/project2/sql/sqlVariableBinder.cpp
@@ -0,0 +1,77 @@
+#include "sqlVariableBinder.h"
+#include "command.h"
+#include "variables.h"
+#include <boost/date_time/posix_time/conversion.hpp>
+
+SqlVariableBinder::SqlVariableBinder(DB::Command * c, unsigned int i) :
+ cmd(c),
+ idx(i)
+{
+}
+void
+SqlVariableBinder::operator()(const Null &) const
+{
+ cmd->bindNull(idx);
+}
+void
+SqlVariableBinder::operator()(const Glib::ustring & i) const
+{
+ cmd->bindParamS(idx, i);
+}
+void
+SqlVariableBinder::operator()(const long long unsigned int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const long unsigned int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const unsigned int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const short unsigned int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const long long int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const long int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const short int & i) const
+{
+ cmd->bindParamI(idx, i);
+}
+void
+SqlVariableBinder::operator()(const double & i) const
+{
+ cmd->bindParamF(idx, i);
+}
+void
+SqlVariableBinder::operator()(const float & i) const
+{
+ cmd->bindParamF(idx, i);
+}
+void
+SqlVariableBinder::operator()(const boost::posix_time::ptime & i) const
+{
+ struct tm tm(boost::posix_time::to_tm(i));
+ cmd->bindParamT(idx, &tm);
+}
+
diff --git a/project2/sql/sqlVariableBinder.h b/project2/sql/sqlVariableBinder.h
new file mode 100644
index 0000000..df3879a
--- /dev/null
+++ b/project2/sql/sqlVariableBinder.h
@@ -0,0 +1,36 @@
+#ifndef SQLVARIABLEBINDER_H
+#define SQLVARIABLEBINDER_H
+
+#include <boost/variant.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
+#include <glibmm/ustring.h>
+
+namespace DB {
+ class Command;
+}
+class Null;
+class SqlVariableBinder : public boost::static_visitor<> {
+ public:
+ SqlVariableBinder(DB::Command * c, unsigned int i);
+ void operator()(const Null & i) const;
+ void operator()(const Glib::ustring & i) const;
+ void operator()(const long long unsigned int & i) const;
+ void operator()(const long unsigned int & i) const;
+ void operator()(const unsigned int & i) const;
+ void operator()(const short unsigned int & i) const;
+ void operator()(const long long int & i) const;
+ void operator()(const long int & i) const;
+ void operator()(const int & i) const;
+ void operator()(const short int & i) const;
+ void operator()(const double & i) const;
+ void operator()(const float & i) const;
+ void operator()(const boost::posix_time::ptime & i) const;
+
+ private:
+ DB::Command * cmd;
+ unsigned int idx;
+};
+
+#endif
+
diff --git a/project2/sql/sqlWriter.cpp b/project2/sql/sqlWriter.cpp
new file mode 100644
index 0000000..88d9801
--- /dev/null
+++ b/project2/sql/sqlWriter.cpp
@@ -0,0 +1,131 @@
+#include "sqlWriter.h"
+#include <boost/foreach.hpp>
+#include "sqlVariableBinder.h"
+
+DynamicSql::SqlWriter::SqlWriter()
+{
+}
+
+DynamicSql::SqlWriter::~SqlWriter()
+{
+}
+
+DynamicSql::SqlCommand::SqlCommand(const xmlpp::Element * N)
+{
+ BOOST_FOREACH(xmlpp::Node * n, N->get_children()) {
+ const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n);
+ if (t) {
+ writers.push_back(new SqlText(t));
+ }
+ const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n);
+ if (e) {
+ if (e->get_name() == "filter") {
+ SqlFilterPtr f = new SqlFilter(e);
+ writers.push_back(f);
+ filters.insert(Filters::value_type(f->name, f));
+ }
+ else if (e->get_name() == "param") {
+ writers.push_back(new SqlParameter(e));
+ }
+ }
+ }
+}
+
+Glib::ustring
+DynamicSql::SqlCommand::getSqlFor(const Glib::ustring & f) const
+{
+ BOOST_FOREACH(const SqlCommand::Filters::value_type & filter, filters) {
+ filter.second->active = (filter.second->name == f);
+ }
+ Glib::ustring sql;
+ writeSql(sql);
+ return sql;
+}
+
+void
+DynamicSql::SqlCommand::writeSql(Glib::ustring & sql) const
+{
+ BOOST_FOREACH(const SqlWriterPtr & w, writers) {
+ w->writeSql(sql);
+ }
+}
+
+void
+DynamicSql::SqlCommand::bindParams(DB::Command * cmd, unsigned int & offset) const
+{
+ BOOST_FOREACH(const SqlWriterPtr & w, writers) {
+ w->bindParams(cmd, offset);
+ }
+}
+
+DynamicSql::SqlFilter::SqlFilter(const xmlpp::Element * N) :
+ name(N->get_attribute_value("name")),
+ active(false)
+{
+ BOOST_FOREACH(xmlpp::Node * n, N->get_children()) {
+ const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n);
+ if (t) {
+ writers.push_back(new SqlText(t));
+ }
+ const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n);
+ if (e) {
+ if (e->get_name() == "param") {
+ writers.push_back(new SqlParameter(e));
+ }
+ }
+ }
+}
+
+void
+DynamicSql::SqlFilter::writeSql(Glib::ustring & sql) const
+{
+ if (active) {
+ BOOST_FOREACH(const SqlWriterPtr & w, writers) {
+ w->writeSql(sql);
+ }
+ }
+}
+
+void
+DynamicSql::SqlFilter::bindParams(DB::Command * cmd, unsigned int & offset) const
+{
+ if (active) {
+ BOOST_FOREACH(const SqlWriterPtr & w, writers) {
+ w->bindParams(cmd, offset);
+ }
+ }
+}
+
+DynamicSql::SqlParameter::SqlParameter(const xmlpp::Element * n) :
+ Variable(n, boost::optional<Glib::ustring>("local"))
+{
+}
+
+void
+DynamicSql::SqlParameter::writeSql(Glib::ustring & sql) const
+{
+ sql.append("?");
+}
+
+void
+DynamicSql::SqlParameter::bindParams(DB::Command * cmd, unsigned int & offset) const
+{
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, offset++), (*this));
+}
+
+DynamicSql::SqlText::SqlText(const xmlpp::TextNode * n) :
+ text(n->get_content())
+{
+}
+
+void
+DynamicSql::SqlText::writeSql(Glib::ustring & sql) const
+{
+ sql.append(text);
+}
+
+void
+DynamicSql::SqlText::bindParams(DB::Command *, unsigned int &) const
+{
+}
+
diff --git a/project2/sql/sqlWriter.h b/project2/sql/sqlWriter.h
new file mode 100644
index 0000000..7693e19
--- /dev/null
+++ b/project2/sql/sqlWriter.h
@@ -0,0 +1,63 @@
+#ifndef SQLWRITER_H
+#define SQLWRITER_H
+
+#include <intrusivePtrBase.h>
+#include <libxml++/nodes/textnode.h>
+#include <command.h>
+#include <glibmm/ustring.h>
+#include <list>
+#include <map>
+#include "variables.h"
+
+namespace DynamicSql {
+ class SqlWriter;
+ typedef boost::intrusive_ptr<SqlWriter> SqlWriterPtr;
+ typedef std::list<SqlWriterPtr> Writers;
+ class SqlWriter : public IntrusivePtrBase {
+ public:
+ SqlWriter();
+ virtual ~SqlWriter();
+ virtual void writeSql(Glib::ustring & sql) const = 0;
+ virtual void bindParams(DB::Command *, unsigned int & offset) const = 0;
+ };
+ class SqlText : public SqlWriter {
+ public:
+ SqlText(const xmlpp::TextNode *);
+ virtual void writeSql(Glib::ustring & sql) const;
+ virtual void bindParams(DB::Command *, unsigned int & offset) const;
+
+ const Glib::ustring text;
+ };
+ class SqlParameter : public SqlWriter, Variable {
+ public:
+ SqlParameter(const xmlpp::Element *);
+ virtual void writeSql(Glib::ustring & sql) const;
+ virtual void bindParams(DB::Command *, unsigned int & offset) const;
+ };
+ class SqlFilter : public SqlWriter {
+ public:
+ SqlFilter(const xmlpp::Element *);
+ virtual void writeSql(Glib::ustring & sql) const;
+ virtual void bindParams(DB::Command *, unsigned int & offset) const;
+
+ const Glib::ustring name;
+ bool active;
+ private:
+ Writers writers;
+ };
+ typedef boost::intrusive_ptr<SqlFilter> SqlFilterPtr;
+ class SqlCommand : public SqlWriter {
+ public:
+ SqlCommand(const xmlpp::Element *);
+ virtual void writeSql(Glib::ustring & sql) const;
+ virtual void bindParams(DB::Command *, unsigned int & offset) const;
+ typedef std::multimap<Glib::ustring, SqlFilterPtr> Filters;
+ Glib::ustring getSqlFor(const Glib::ustring & f) const;
+ private:
+ Filters filters;
+ Writers writers;
+ };
+}
+
+#endif
+
diff --git a/project2/sql/tablepatch.cpp b/project2/sql/tablepatch.cpp
new file mode 100644
index 0000000..f87f60e
--- /dev/null
+++ b/project2/sql/tablepatch.cpp
@@ -0,0 +1,438 @@
+#include "tablepatch.h"
+#include <stdio.h>
+#include <misc.h>
+#include <selectcommand.h>
+#include <column.h>
+#include <buffer.h>
+#include <boost/algorithm/string/join.hpp>
+
+using namespace DB;
+
+TablePatch::TablePatch(const Connection & wdb, const TablePatch::Table & s, const TablePatch::Table & d,
+ const TablePatch::Columns & c) :
+ src(s),
+ dest(d),
+ cols(c),
+ db(wdb)
+{
+ if (!src.length()) {
+ throw PatchCheckFailure();
+ }
+ if (!dest.length()) {
+ throw PatchCheckFailure();
+ }
+ if (!db.inTx()) {
+ throw PatchCheckFailure();
+ }
+}
+
+void
+TablePatch::addKey(const TablePatch::Column & c)
+{
+ pk.insert(c);
+}
+
+void
+TablePatch::patch(const char * where, const char * order)
+{
+ if (pk.size() == 0) {
+ throw PatchCheckFailure();
+ }
+ doDeletes(where, order);
+ doUpdates(where, order);
+ doInserts(order);
+}
+
+void
+TablePatch::doDeletes(const char * where, const char * order)
+{
+ switch (db.bulkDeleteStyle()) {
+ case BulkDeleteUsingSubSelect:
+ {
+ // -----------------------------------------------------------------
+ // Build SQL to delete keys ----------------------------------------
+ // -----------------------------------------------------------------
+ Buffer toDelSql;
+ toDelSql.appendf("DELETE FROM %s WHERE (",
+ dest.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki != pk.begin()) {
+ toDelSql.append(", ");
+ }
+ toDelSql.appendf("%s.%s",
+ dest.c_str(),
+ pki->c_str());
+ }
+ // -----------------------------------------------------------------
+ // Build SQL to select keys to delete ------------------------------
+ // -----------------------------------------------------------------
+ toDelSql.append(") IN (SELECT ");
+ foreach (PKI, pk, pki) {
+ if (pki != pk.begin()) {
+ toDelSql.append(", ");
+ }
+ toDelSql.appendf("a.%s",
+ pki->c_str());
+ }
+ toDelSql.appendf(" FROM %s a LEFT OUTER JOIN %s b ON ",
+ dest.c_str(), src.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki != pk.begin()) {
+ toDelSql.append(" AND ");
+ }
+ toDelSql.appendf(" a.%s = b.%s",
+ pki->c_str(), pki->c_str());
+ }
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ toDelSql.append(" WHERE ");
+ }
+ else {
+ toDelSql.append(" AND ");
+ }
+ toDelSql.appendf(" b.%s IS NULL",
+ pki->c_str());
+ }
+ if (where && *where) {
+ toDelSql.appendf(" AND %s", where);
+ }
+ if (order && *order) {
+ toDelSql.appendf(" ORDER BY %s", order);
+ }
+ toDelSql.append(")");
+ ModifyCommand * del = db.newModifyCommand(toDelSql);
+ del->execute();
+ delete del;
+ break;
+ }
+ case BulkDeleteUsingUsingAlias:
+ case BulkDeleteUsingUsing:
+ {
+ Buffer toDelSql;
+ toDelSql.appendf("DELETE FROM %s USING %s a LEFT OUTER JOIN %s b ",
+ (db.bulkDeleteStyle() == BulkDeleteUsingUsingAlias ? "a" : dest.c_str()),
+ dest.c_str(), src.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki != pk.begin()) {
+ toDelSql.append(" AND ");
+ }
+ else {
+ toDelSql.append(" ON ");
+ }
+ toDelSql.appendf(" a.%s = b.%s",
+ pki->c_str(), pki->c_str());
+ }
+ foreach (PKI, pk, pki) {
+ if (pki != pk.begin()) {
+ toDelSql.append(" AND ");
+ }
+ else {
+ toDelSql.append(" WHERE ");
+ }
+ toDelSql.appendf(" b.%s IS NULL",
+ pki->c_str());
+ }
+ if (where && *where) {
+ toDelSql.appendf(" AND %s", where);
+ }
+ if (order && *order) {
+ toDelSql.appendf(" ORDER BY %s", order);
+ }
+ ModifyCommand * del = db.newModifyCommand(toDelSql);
+ del->execute();
+ delete del;
+ break;
+ }
+ }
+}
+
+void
+TablePatch::doUpdates(const char * where, const char * order)
+{
+ if (cols.size() == pk.size()) {
+ // Can't "change" anything... it's all part of the key
+ return;
+ }
+ switch (db.bulkUpdateStyle()) {
+ case BulkUpdateByIteration:
+ {
+ // -----------------------------------------------------------------
+ // Build SQL for list of updates to perform ------------------------
+ // -----------------------------------------------------------------
+ Buffer toUpdSel;
+ toUpdSel.append("SELECT ");
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ toUpdSel.appendf("b.%s, ",
+ col->c_str());
+ }
+ }
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) != pk.end()) {
+ toUpdSel.appendf("b.%s, ",
+ col->c_str());
+ }
+ }
+ toUpdSel.appendf("0 FROM %s a, %s b",
+ dest.c_str(), src.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ toUpdSel.append(" WHERE ");
+ }
+ else {
+ toUpdSel.append(" AND ");
+ }
+ toUpdSel.appendf(" a.%s = b.%s",
+ pki->c_str(), pki->c_str());
+ }
+ if (where && *where) {
+ toUpdSel.appendf(" AND %s", where);
+ }
+ toUpdSel.append(" AND (");
+ bool first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ toUpdSel.append(" OR ");
+ }
+ first = false;
+ toUpdSel.appendf(
+ " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
+ + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
+ col->c_str(), col->c_str(), col->c_str(), col->c_str());
+ }
+ }
+ toUpdSel.append(")");
+ if (order && *order) {
+ toUpdSel.appendf(" ORDER BY %s", order);
+ }
+ // -----------------------------------------------------------------
+ // Build SQL to perform updates ------------------------------------
+ // -----------------------------------------------------------------
+ Buffer updSql;
+ updSql.appendf("UPDATE %s SET ",
+ dest.c_str());
+ first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ updSql.append(", ");
+ }
+ first = false;
+ updSql.appendf(" %s = ?",
+ col->c_str());
+ }
+ }
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ updSql.append(" WHERE ");
+ }
+ else {
+ updSql.append(" AND ");
+ }
+ updSql.appendf(" %s = ?",
+ pki->c_str());
+ }
+ // -----------------------------------------------------------------
+ // Iterator over update list make changes --------------------------
+ // -----------------------------------------------------------------
+ SelectCommand * toUpd = db.newSelectCommand(toUpdSel);
+ ModifyCommand * upd = db.newModifyCommand(updSql);
+ int cs = cols.size();
+ toUpd->execute();
+ for (int c = 0; c < cs; c += 1) {
+ (*toUpd)[c].rebind(upd, c);
+ }
+ while (toUpd->fetch()) {
+ upd->execute(false);
+ }
+ delete toUpd;
+ delete upd;
+ }
+ break;
+ case BulkUpdateUsingFromSrc:
+ {
+ // -----------------------------------------------------------------
+ // Build SQL for list of updates to perform ------------------------
+ // -----------------------------------------------------------------
+ Buffer updSql;
+ updSql.appendf("UPDATE %s a SET ",
+ dest.c_str());
+ bool first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ updSql.append(", ");
+ }
+ first = false;
+ updSql.appendf(" %s = b.%s ",
+ col->c_str(), col->c_str());
+ }
+ }
+ updSql.appendf(" FROM %s b ",
+ src.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ updSql.append(" WHERE ");
+ }
+ else {
+ updSql.append(" AND ");
+ }
+ updSql.appendf(" a.%s = b.%s ",
+ pki->c_str(), pki->c_str());
+ }
+ updSql.append(" AND (");
+ first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ updSql.append(" OR ");
+ }
+ first = false;
+ updSql.appendf(
+ " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
+ + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
+ col->c_str(), col->c_str(),
+ col->c_str(), col->c_str());
+ }
+ }
+ updSql.append(")");
+ if (where && *where) {
+ updSql.appendf(" AND %s ", where);
+ }
+ if (order && *order) {
+ updSql.appendf(" ORDER BY %s", order);
+ }
+ // -----------------------------------------------------------------
+ // Execute the bulk update command ---------------------------------
+ // -----------------------------------------------------------------
+ ModifyCommand * upd = db.newModifyCommand(updSql);
+ upd->execute(true);
+ delete upd;
+ break;
+ }
+ case BulkUpdateUsingJoin:
+ {
+ // -----------------------------------------------------------------
+ // Build SQL for list of updates to perform ------------------------
+ // -----------------------------------------------------------------
+ Buffer updSql;
+ updSql.appendf("UPDATE %s a, %s b SET ",
+ dest.c_str(), src.c_str());
+ bool first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ updSql.append(", ");
+ }
+ first = false;
+ updSql.appendf(" a.%s = b.%s ",
+ col->c_str(), col->c_str());
+ }
+ }
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ updSql.append(" WHERE ");
+ }
+ else {
+ updSql.append(" AND ");
+ }
+ updSql.appendf(" a.%s = b.%s ",
+ pki->c_str(), pki->c_str());
+ }
+ updSql.append(" AND (");
+ first = true;
+ foreach (Columns::const_iterator, cols, col) {
+ if (pk.find(*col) == pk.end()) {
+ if (!first) {
+ updSql.append(" OR ");
+ }
+ first = false;
+ updSql.appendf(
+ " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \
+ + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)",
+ col->c_str(), col->c_str(),
+ col->c_str(), col->c_str());
+ }
+ }
+ updSql.append(")");
+ if (where && *where) {
+ updSql.appendf(" AND %s ", where);
+ }
+ if (order && *order) {
+ updSql.appendf(" ORDER BY %s", order);
+ }
+ // -----------------------------------------------------------------
+ // Execute the bulk update command ---------------------------------
+ // -----------------------------------------------------------------
+ ModifyCommand * upd = db.newModifyCommand(updSql);
+ upd->execute(true);
+ delete upd;
+ break;
+ }
+ }
+}
+
+void
+TablePatch::doInserts(const char * order)
+{
+ // -----------------------------------------------------------------
+ // Build SQL for copying new records -------------------------------
+ // -----------------------------------------------------------------
+ Buffer toInsSql;
+ toInsSql.appendf("INSERT INTO %s",
+ dest.c_str());
+ foreach (Columns::const_iterator, cols, col) {
+ if (col == cols.begin()) {
+ toInsSql.append("(");
+ }
+ else {
+ toInsSql.append(", ");
+ }
+ toInsSql.appendf("%s",
+ col->c_str());
+ }
+ toInsSql.append(") SELECT ");
+ foreach (Columns::const_iterator, cols, col) {
+ if (col != cols.begin()) {
+ toInsSql.append(", ");
+ }
+ toInsSql.appendf("b.%s",
+ col->c_str());
+ }
+ toInsSql.appendf(" FROM %s b LEFT OUTER JOIN %s a",
+ src.c_str(), dest.c_str());
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ toInsSql.append(" ON ");
+ }
+ else {
+ toInsSql.append(" AND ");
+ }
+ toInsSql.appendf(" a.%s = b.%s",
+ pki->c_str(), pki->c_str());
+ }
+ foreach (PKI, pk, pki) {
+ if (pki == pk.begin()) {
+ toInsSql.append(" WHERE ");
+ }
+ else {
+ toInsSql.append(" AND ");
+ }
+ toInsSql.appendf(" a.%s IS NULL",
+ pki->c_str());
+ }
+ if (order && *order) {
+ toInsSql.appendf(" ORDER BY %s", order);
+ }
+ ModifyCommand * ins = db.newModifyCommand(toInsSql);
+ ins->execute();
+ delete ins;
+}
+
+const char *
+TablePatch::PatchCheckFailure::what() const throw()
+{
+ return "Santiy checks failed: check table names and keys";
+}
+
diff --git a/project2/sql/tablepatch.h b/project2/sql/tablepatch.h
new file mode 100644
index 0000000..0174294
--- /dev/null
+++ b/project2/sql/tablepatch.h
@@ -0,0 +1,42 @@
+#ifndef TABLEPATCH_H
+#define TABLEPATCH_H
+
+#include <string>
+#include <set>
+#include <map>
+#include <connection.h>
+#include <modifycommand.h>
+#include <selectcommand.h>
+
+class TablePatch {
+ public:
+ typedef std::string Table;
+ typedef std::string Column;
+ typedef std::set<Column> Columns;
+ typedef Columns PrimaryKey;
+ typedef PrimaryKey::const_iterator PKI;
+
+ class PatchCheckFailure : public std::exception {
+ public:
+ const char * what() const throw();
+ };
+
+ TablePatch(const DB::Connection & db, const Table & src, const Table & dest, const Columns & cols);
+
+ void addKey(const Column & col);
+ void patch(const char * where, const char * order);
+
+ private:
+ void doDeletes(const char * where, const char * order);
+ void doUpdates(const char * where, const char * order);
+ void doInserts(const char * order);
+
+ Table src;
+ Table dest;
+ PrimaryKey pk;
+ Columns cols;
+ const DB::Connection &db;
+};
+
+#endif
+