diff options
Diffstat (limited to 'project2/sql')
-rw-r--r-- | project2/sql/Jamfile.jam | 38 | ||||
-rw-r--r-- | project2/sql/connectionLoader.h | 19 | ||||
-rw-r--r-- | project2/sql/rdbmsDataSource.cpp | 220 | ||||
-rw-r--r-- | project2/sql/rdbmsDataSource.h | 78 | ||||
-rw-r--r-- | project2/sql/sql-modODBC.cpp | 4 | ||||
-rw-r--r-- | project2/sql/sql-modPQ.cpp | 4 | ||||
-rw-r--r-- | project2/sql/sqlCache.cpp | 300 | ||||
-rw-r--r-- | project2/sql/sqlCheck.cpp | 100 | ||||
-rw-r--r-- | project2/sql/sqlCheck.h | 29 | ||||
-rw-r--r-- | project2/sql/sqlHandleAsVariableType.cpp | 19 | ||||
-rw-r--r-- | project2/sql/sqlHandleAsVariableType.h | 18 | ||||
-rw-r--r-- | project2/sql/sqlMergeTask.cpp | 329 | ||||
-rw-r--r-- | project2/sql/sqlMergeTask.h | 79 | ||||
-rw-r--r-- | project2/sql/sqlRows.cpp | 69 | ||||
-rw-r--r-- | project2/sql/sqlRows.h | 40 | ||||
-rw-r--r-- | project2/sql/sqlTask.cpp | 70 | ||||
-rw-r--r-- | project2/sql/sqlTask.h | 36 | ||||
-rw-r--r-- | project2/sql/sqlVariableBinder.cpp | 77 | ||||
-rw-r--r-- | project2/sql/sqlVariableBinder.h | 36 | ||||
-rw-r--r-- | project2/sql/sqlWriter.cpp | 131 | ||||
-rw-r--r-- | project2/sql/sqlWriter.h | 63 | ||||
-rw-r--r-- | project2/sql/tablepatch.cpp | 438 | ||||
-rw-r--r-- | project2/sql/tablepatch.h | 42 |
23 files changed, 2239 insertions, 0 deletions
diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam new file mode 100644 index 0000000..b05fd34 --- /dev/null +++ b/project2/sql/Jamfile.jam @@ -0,0 +1,38 @@ +alias libxmlpp : : : : + <cflags>"`pkg-config --cflags libxml++-2.6`" + <linkflags>"`pkg-config --libs libxml++-2.6`" ; + +explicit object sql-modODBC ; +obj sql-modODBC : + sql-modODBC.cpp : + <library>../../libodbcpp//odbcpp + <library>libxmlpp + <include>../../libmisc + <include>../common + : : + <library>../../libodbcpp//odbcpp + ; + +explicit object sql-modPQ ; +obj sql-modPQ : + sql-modPQ.cpp : + <library>../../libpqpp//pqpp + <library>libxmlpp + <include>../../libmisc + <include>../common + : : + <library>../../libpqpp//pqpp + ; + +lib p2sql : + sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp + sqlHandleAsVariableType.cpp + ../../libdbpp//dbpp + : + <odbc>yes:<library>sql-modODBC + <pq>yes:<library>sql-modPQ + <library>libxmlpp + <library>../common//p2common + <include>../../libmisc + ; + diff --git a/project2/sql/connectionLoader.h b/project2/sql/connectionLoader.h new file mode 100644 index 0000000..841b425 --- /dev/null +++ b/project2/sql/connectionLoader.h @@ -0,0 +1,19 @@ +#include "xmlObjectLoader.h" +#include "../libdbpp/connection.h" + +/// Base class to implement DB connection type modules +class ConnectionLoader : public ComponentLoader { + public: + virtual DB::Connection * connect(const std::string & dsn) const = 0; +}; + +/// Helper implemention for specific DB types +template <class DBType> +class ConnectionLoaderImpl : public ConnectionLoader { + public: + virtual DB::Connection * connect(const std::string & dsn) const + { + return new DBType(dsn); + } +}; + diff --git a/project2/sql/rdbmsDataSource.cpp b/project2/sql/rdbmsDataSource.cpp new file mode 100644 index 0000000..dac6cb1 --- /dev/null +++ b/project2/sql/rdbmsDataSource.cpp @@ -0,0 +1,220 @@ +#include "rdbmsDataSource.h" +#include "connectionLoader.h" +#include <libxml++/nodes/textnode.h> +#include <sys/utsname.h> +#include "logger.h" +#include <errno.h> +#include <boost/foreach.hpp> + +SimpleMessageException(UnknownConnectionProvider); + +/// Specialized ElementLoader for instances of RdbmsDataSource; handles persistent DB connections +class RdbmsDataSourceLoader : public ElementLoaderImpl<RdbmsDataSource> { + public: + void onIdle() + { + // Disconnect all cached database connections + RdbmsDataSource::dbhosts.clear(); + } + static bool isConnectionExpired(const RdbmsDataSource::DBHosts::value_type & con) + { + return con.second->isExpired(); + } + void onPeriodic() + { + // Disconnect expired database connections + RdbmsDataSource::DBHosts::iterator i; + while ((i = std::find_if(RdbmsDataSource::dbhosts.begin(), RdbmsDataSource::dbhosts.end(), isConnectionExpired)) != RdbmsDataSource::dbhosts.end()) { + RdbmsDataSource::dbhosts.erase(i); + } + } + void onIteration() + { + RdbmsDataSource::changedDSNs.clear(); + } +}; +DECLARE_CUSTOM_LOADER("rdbmsdatasource", RdbmsDataSourceLoader); + +RdbmsDataSource::DBHosts RdbmsDataSource::dbhosts; +RdbmsDataSource::FailedHosts RdbmsDataSource::failedhosts; +RdbmsDataSource::DSNSet RdbmsDataSource::changedDSNs; + +RdbmsDataSource::RdbmsDataSource(const xmlpp::Element * p) : + DataSource(p), + masterDsn(dynamic_cast<const xmlpp::Element *>(p->find("masterdsn").front())), + preferLocal(p->get_attribute_value("preferlocal") != "false") +{ + BOOST_FOREACH(const xmlpp::Node * node, p->find("readonly/dsn")) { + const xmlpp::Element * elem = dynamic_cast<const xmlpp::Element *>(node); + if (elem) { + roDSNs.insert(ReadonlyDSNs::value_type(elem->get_attribute_value("host"), elem)); + } + } +} + +RdbmsDataSource::~RdbmsDataSource() +{ +} + +void +RdbmsDataSource::loadComplete(const CommonObjects *) +{ +} + +const DB::Connection & +RdbmsDataSource::getWritable() const +{ + ConnectionPtr master = connectTo(masterDsn); + if (!master->txOpen) { + master->connection->beginTx(); + master->txOpen = true; + } + changedDSNs.insert(name); + return *master->connection; +} + +const DB::Connection & +RdbmsDataSource::getReadonly() const +{ + if (changedDSNs.find(name) != changedDSNs.end()) { + return *connectTo(masterDsn)->connection; + } + if (localhost.length() == 0 && preferLocal) { + struct utsname name; + if (uname(&name)) { + Logger()->messagef(LOG_WARNING, "%s: Unable to determine local host name (%d:%s)", + __PRETTY_FUNCTION__, errno, strerror(errno)); + localhost = "unknown"; + } + else { + localhost = name.nodename; + } + } + if (preferLocal) { + ReadonlyDSNs::const_iterator ro = roDSNs.find(localhost); + try { + if (ro == roDSNs.end()) { + Logger()->messagef(LOG_INFO, "%s: No database host matches local host name (%s) Will use master DSN", + __PRETTY_FUNCTION__, localhost.c_str()); + return *connectTo(masterDsn)->connection; + } + return *connectTo(ro->second)->connection; + } + catch (...) { + // Failed to connect to a preferred DB... carry on and try the others... + } + } + BOOST_FOREACH(ReadonlyDSNs::value_type db, roDSNs) { + try { + return *connectTo(db.second)->connection; + } + catch (...) { + } + } + return *connectTo(masterDsn)->connection; +} + +void +RdbmsDataSource::commit() +{ + DBHosts::const_iterator m = dbhosts.find(masterDsn); + if (m != dbhosts.end() && m->second->txOpen) { + m->second->connection->commitTx(); + m->second->txOpen = false; + } +} + +void +RdbmsDataSource::rollback() +{ + DBHosts::const_iterator m = dbhosts.find(masterDsn); + if (m != dbhosts.end() && m->second->txOpen) { + m->second->connection->rollbackTx(); + m->second->txOpen = false; + } + changedDSNs.erase(name); +} + +RdbmsDataSource::ConnectionPtr +RdbmsDataSource::connectTo(const ConnectionInfo & dsn) +{ + FailedHosts::iterator dbf = failedhosts.find(dsn); + if (dbf != failedhosts.end()) { + if (time(NULL) - 20 > dbf->second.FailureTime) { + failedhosts.erase(dbf); + } + else { + throw dbf->second; + } + } + DBHosts::const_iterator dbi = dbhosts.find(dsn); + if (dbi != dbhosts.end()) { + try { + dbi->second->connection->ping(); + dbi->second->touch(); + return dbi->second; + } + catch (...) { + // Connection in failed state + Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__); + } + } + + try { + ConnectionPtr db = ConnectionPtr(new RdbmsConnection(dsn.connect(), 300)); + dbhosts[dsn] = db; + db->touch(); + return db; + } + catch (const DB::ConnectionError & e) { + failedhosts.insert(FailedHosts::value_type(dsn, e)); + throw; + } +} + +RdbmsDataSource::RdbmsConnection::RdbmsConnection(const DB::Connection * con, time_t kat) : + connection(con), + txOpen(false), + lastUsedTime(0), + keepAliveTime(kat) +{ +} + +RdbmsDataSource::RdbmsConnection::~RdbmsConnection() +{ + connection->finish(); + delete connection; +} + +void +RdbmsDataSource::RdbmsConnection::touch() const +{ + time(&lastUsedTime); +} + +bool +RdbmsDataSource::RdbmsConnection::isExpired() const +{ + return (time(NULL) > lastUsedTime + keepAliveTime); +} + +RdbmsDataSource::ConnectionInfo::ConnectionInfo(const xmlpp::Element * n) +{ + BOOST_FOREACH(const xmlpp::Node * node, n->get_children()) { + typeId = LoaderBase::getLoader<ConnectionLoader, UnknownConnectionProvider>(node->get_name()); + dsn = dynamic_cast<const xmlpp::Element *>(node)->get_child_text()->get_content(); + } +} + +DB::Connection * +RdbmsDataSource::ConnectionInfo::connect() const +{ + return typeId->connect(dsn); +} + +bool +RdbmsDataSource::ConnectionInfo::operator<(const RdbmsDataSource::ConnectionInfo & other) const +{ + return ((typeId < other.typeId) || ((typeId == other.typeId) && (dsn < other.dsn))); +} + diff --git a/project2/sql/rdbmsDataSource.h b/project2/sql/rdbmsDataSource.h new file mode 100644 index 0000000..fdcfbe7 --- /dev/null +++ b/project2/sql/rdbmsDataSource.h @@ -0,0 +1,78 @@ +#ifndef RDBMSDATASOURCE_H +#define RDBMSDATASOURCE_H + +#include <libxml/tree.h> +#include <boost/shared_ptr.hpp> +#include <map> +#include <set> +#include "dataSource.h" +#include "../libdbpp/connection.h" +#include "../libdbpp/error.h" +#include "xmlObjectLoader.h" + +class ConnectionLoader; + +/// Project2 component to provide access to transactional RDBMS data sources +class RdbmsDataSource : public DataSource { + public: + class RdbmsConnection { + public: + RdbmsConnection(const DB::Connection * connection, time_t kat); + ~RdbmsConnection(); + + void touch() const; + bool isExpired() const; + const DB::Connection * const connection; + bool txOpen; + + private: + mutable time_t lastUsedTime; + const time_t keepAliveTime; + }; + + class ConnectionInfo { + public: + ConnectionInfo(const xmlpp::Element *); + + DB::Connection * connect() const; + + bool operator<(const ConnectionInfo & o) const; + + private: + std::string dsn; + boost::shared_ptr<ConnectionLoader> typeId; + }; + + typedef boost::shared_ptr<RdbmsConnection> ConnectionPtr; + typedef std::map<std::string, ConnectionInfo> ReadonlyDSNs; // Map hostname to DSN string + typedef std::map<ConnectionInfo, ConnectionPtr> DBHosts; // Map DSN strings to connections + typedef std::map<ConnectionInfo, const DB::ConnectionError> FailedHosts; // Map DSN strings to failures + + RdbmsDataSource(const xmlpp::Element * p); + ~RdbmsDataSource(); + + const DB::Connection & getReadonly() const; + const DB::Connection & getWritable() const; + virtual void loadComplete(const CommonObjects *); + virtual void commit(); + virtual void rollback(); + + const ConnectionInfo masterDsn; + const bool preferLocal; + + protected: + static ConnectionPtr connectTo(const ConnectionInfo & dsn); + ReadonlyDSNs roDSNs; + + private: + mutable std::string localhost; + static DBHosts dbhosts; + static FailedHosts failedhosts; + typedef std::set<std::string> DSNSet; + static DSNSet changedDSNs; + + friend class RdbmsDataSourceLoader; +}; + +#endif + diff --git a/project2/sql/sql-modODBC.cpp b/project2/sql/sql-modODBC.cpp new file mode 100644 index 0000000..1703cb6 --- /dev/null +++ b/project2/sql/sql-modODBC.cpp @@ -0,0 +1,4 @@ +#include "connectionLoader.h" +#include "../libodbcpp/connection.h" +typedef ODBC::Connection ODBCConnection; +DECLARE_COMPONENT_LOADER("odbc", ODBCConnection, ConnectionLoader) diff --git a/project2/sql/sql-modPQ.cpp b/project2/sql/sql-modPQ.cpp new file mode 100644 index 0000000..5991754 --- /dev/null +++ b/project2/sql/sql-modPQ.cpp @@ -0,0 +1,4 @@ +#include "connectionLoader.h" +#include "../libpqpp/connection.h" +typedef PQ::Connection PQConnection; +DECLARE_COMPONENT_LOADER("postgresql", PQConnection, ConnectionLoader) diff --git a/project2/sql/sqlCache.cpp b/project2/sql/sqlCache.cpp new file mode 100644 index 0000000..13bc23d --- /dev/null +++ b/project2/sql/sqlCache.cpp @@ -0,0 +1,300 @@ +#include "cache.h" +#include "sqlVariableBinder.h" +#include "sqlHandleAsVariableType.h" +#include "buffer.h" +#include "selectcommand.h" +#include "modifycommand.h" +#include "column.h" +#include "commonObjects.h" +#include "rdbmsDataSource.h" +#include "logger.h" +#include "xmlObjectLoader.h" +#include "iHaveParameters.h" +#include "rowSet.h" +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <boost/algorithm/string/predicate.hpp> + +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; +typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr; + +class SqlCache : public Cache { + public: + SqlCache(const xmlpp::Element * p) : + Cache(p) + { + } + + void loadComplete(const CommonObjects * co) + { + db = co->dataSource<RdbmsDataSource>(DataSource); + } + + static void appendKeyCols(Buffer * sql, unsigned int * off, const Glib::ustring & col) + { + if ((*off)++) { + sql->append(", "); + } + sql->append(col.c_str()); + } + + static void appendKeyBinds(Buffer * sql, unsigned int * off) + { + if ((*off)++) { + sql->append(", "); + } + sql->append("?"); + } + + static void appendKeyAnds(Buffer * sql, const Glib::ustring & col) + { + sql->appendf(" AND h.%s = ?", col.c_str()); + } + + static void bindKeyValues(DB::Command * cmd, unsigned int * offset, const VariableType & v) + { + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, (*offset)++), v); + } + + class SqlCacheRowSet : public RowSet { + public: + SqlCacheRowSet(SelectPtr r) : + RowSet(NULL), + s(r) { + } + void loadComplete(const CommonObjects *) { + } + class SqlCacheRowState : public RowState { + public: + SqlCacheRowState(const SqlCacheRowSet * s) : + sc(s) { + } + const Columns & getColumns() const { + return columns; + } + RowAttribute resolveAttr(const Glib::ustring & attrName) const { + return boost::bind(&SqlCacheRowState::getAttrCol, this, "p2attr_" + attrName); + } + private: + VariableType getAttrCol(const Glib::ustring & col) const { + HandleAsVariableType h; + (*sc->s)[col].apply(h); + return h.variable; + } + mutable Columns columns; + friend class SqlCacheRowSet; + const SqlCacheRowSet * sc; + }; + void execute(const Glib::ustring&, const RowProcessor * rp) const { + SqlCacheRowState ss(this); + HandleAsVariableType h; + do { + if (!(*s)["p2_cacheid"].isNull()) { + if (colCols.empty()) { + (*s)["p2_cacheid"].apply(h); + cacheId = h.variable; + unsigned int colNo = 0; + for (unsigned int c = 0; c < s->columnCount(); c++) { + const DB::Column & col = (*s)[c]; + if (!boost::algorithm::starts_with(col.name, "p2attr_") && + !boost::algorithm::starts_with(col.name, "p2_")) { + ss.columns.insert(new Column(colNo++, col.name)); + colCols.push_back(c); + } + } + ss.fields.resize(colCols.size()); + } + else { + (*s)["p2_cacheid"].apply(h); + if (cacheId != (int64_t)h.variable) { + break; + } + } + BOOST_FOREACH(const unsigned int & c, colCols) { + const DB::Column & col = (*s)[c]; + col.apply(h); + ss.fields[&c - &colCols.front()] = h.variable; + } + ss.process(rp); + } + } while (s->fetch()); + } + private: + SelectPtr s; + mutable std::vector<unsigned int> colCols; + mutable int64_t cacheId; + }; + + RowSetCPtr getCachedRowSet(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const + { + Buffer sql; + sql.appendf("SELECT r.* \ + FROM %s p, %s_%s_%s h LEFT OUTER JOIN %s_%s_%s_rows r \ + ON h.p2_cacheid = r.p2_cacheid \ + WHERE p.p2_time > ? \ + AND p.p2_cacheid = h.p2_cacheid", + HeaderTable.c_str(), + HeaderTable.c_str(), n.c_str(), f.c_str(), + HeaderTable.c_str(),n.c_str(), f.c_str()); + applyKeys(boost::bind(appendKeyAnds, &sql, _1), ps); + sql.appendf(" ORDER BY r.p2_cacheid DESC, r.p2_row"); + SelectPtr gh(db->getReadonly().newSelectCommand(sql)); + unsigned int offset = 0; + gh->bindParamT(offset++, time(NULL) - CacheLife); + applyKeys(boost::bind(bindKeyValues, gh.get(), &offset, _2), ps); + if (gh->fetch()) { + return new SqlCacheRowSet(gh); + } + return NULL; + } + + class SqlCachePresenter : public Presenter { + public: + SqlCachePresenter(const Glib::ustring & name, const Glib::ustring & filter, const RdbmsDataSource * d) : + depth(0), + row(1), + db(d), + n(name), + f(filter) { + } + void declareNamespace(const Glib::ustring &, const Glib::ustring &) const { } + void setNamespace(const Glib::ustring &, const Glib::ustring &) const { } + void pushSub(const Glib::ustring & name, const Glib::ustring &) const { + depth += 1; + if (depth == 2) { + col = name; + } + else if (depth == 1) { + } + } + void addAttr(const Glib::ustring & name, const Glib::ustring &, const VariableType & value) const { + attrs.insert(Values::value_type(name, value)); + } + void addText(const VariableType & value) const { + cols.insert(Values::value_type(col, value)); + } + void popSub() const { + if (depth == 2) { + col.clear(); + } + else if (depth == 1) { + Buffer sql; + sql.appendf("INSERT INTO %s_%s_%s_rows(p2_row", HeaderTable.c_str(), n.c_str(), f.c_str()); + BOOST_FOREACH(const Values::value_type & a, attrs) { + sql.appendf(", p2attr_%s", a.first.c_str()); + } + BOOST_FOREACH(const Values::value_type & v, cols) { + sql.appendf(", %s", v.first.c_str()); + } + sql.appendf(") VALUES(?"); + for (size_t x = attrs.size(); x > 0; x -= 1) { + sql.append(", ?"); + } + for (size_t x = cols.size(); x > 0; x -= 1) { + sql.append(", ?"); + } + sql.appendf(")"); + ModifyPtr m(db->getReadonly().newModifyCommand(sql)); + unsigned int offset = 0; + m->bindParamI(offset++, row++); + BOOST_FOREACH(const Values::value_type & a, attrs) { + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), a.second); + } + BOOST_FOREACH(const Values::value_type & v, cols) { + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), v.second); + } + m->execute(); + cols.clear(); + attrs.clear(); + } + depth -= 1; + } + private: + mutable unsigned int depth; + mutable unsigned int row; + const RdbmsDataSource * db; + mutable Glib::ustring col; + const Glib::ustring n, f; + typedef std::map<Glib::ustring, VariableType> Values; + mutable Values cols, attrs; + }; + + PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) + { + // Header + Buffer del; + del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str()); + ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del)); + h->bindParamT(0, time(NULL)); + h->execute(); + // Record set header + Buffer sql; + sql.appendf("INSERT INTO %s_%s_%s(", HeaderTable.c_str(), n.c_str(), f.c_str()); + unsigned int offset = 0; + applyKeys(boost::bind(appendKeyCols, &sql, &offset, _1), ps); + sql.appendf(") VALUES("); + offset = 0; + applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps); + sql.appendf(")"); + ModifyPtr m(db->getReadonly().newModifyCommand(sql)); + offset = 0; + applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps); + m->execute(); + return new SqlCachePresenter(n, f, db); + } + + void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * ) + { + } + + private: + friend class CustomSqlCacheLoader; + const RdbmsDataSource * db; + static std::string DataSource; + static std::string HeaderTable; + static time_t CacheLife; +}; + +std::string SqlCache::DataSource; +std::string SqlCache::HeaderTable; +time_t SqlCache::CacheLife; + +namespace po = boost::program_options; +class CustomSqlCacheLoader : public ElementLoaderImpl<SqlCache> { + public: + CustomSqlCacheLoader() : + opts("SQL Cache options") + { + opts.add_options() + ("cache.sql.datasource", po::value(&SqlCache::DataSource), + "The default datasource to connect to") + ("cache.sql.headertable", po::value(&SqlCache::HeaderTable)->default_value("p2cache"), + "The filename to store the data in") + ("cache.sql.life", po::value(&SqlCache::CacheLife)->default_value(3600), + "The age of cache entries after which they are removed (seconds)") + ; + } + + po::options_description * options() + { + return &opts; + } + + void onIdle() + { + if (!SqlCache::DataSource.empty()) { + boost::intrusive_ptr<CommonObjects> co = new CommonObjects(); + const RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource); + Buffer del; + del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str()); + ModifyPtr m(db->getReadonly().newModifyCommand(del)); + m->bindParamT(0, time(NULL) - SqlCache::CacheLife); + m->execute(); + } + } + + private: + po::options_description opts; +}; +DECLARE_CUSTOM_LOADER("sqlcache", CustomSqlCacheLoader); + diff --git a/project2/sql/sqlCheck.cpp b/project2/sql/sqlCheck.cpp new file mode 100644 index 0000000..d86eb3b --- /dev/null +++ b/project2/sql/sqlCheck.cpp @@ -0,0 +1,100 @@ +#include "sqlCheck.h" +#include "xmlObjectLoader.h" +#include "selectcommand.h" +#include "column.h" +#include "rdbmsDataSource.h" +#include "commonObjects.h" +#include "sqlVariableBinder.h" +#include <boost/foreach.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> + +DECLARE_LOADER("sqlcheck", SqlCheck); + +class CantCompareNulls : public std::exception { }; + +SqlCheck::SqlCheck(const xmlpp::Element * p) : + ParamChecker(p), + dataSource(p, "datasource"), + filter(p, "filter", false, ""), + testOp(p, "testOp", false, "=="), + testValue(p, "testValue"), + sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())) +{ +} + +SqlCheck::~SqlCheck() +{ +} + +void +SqlCheck::loadComplete(const CommonObjects * co) +{ + db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +class HandleDoCompare : public DB::HandleField { + public: + HandleDoCompare(const VariableType & tV, const std::string & tO) : + retVal(false), + testValue(tV), + testOp(tO) { + } + void null() { + throw CantCompareNulls(); + } + void string(const char *c , size_t l) { + doTest(Glib::ustring(c, c + l)); + } + void integer(int64_t val) { + doTest(val); + } + void floatingpoint(double val) { + doTest(val); + } + void timestamp(const struct tm & val) { + doTest(boost::posix_time::ptime_from_tm(val)); + } + bool operator()() const { + return retVal; + } + private: + template <typename TV> + void doTest(const TV & val) { + TV tv = testValue; + if ((testOp == "==" || testOp == "=") && val == tv) { + retVal = true; + } + else if (testOp == "<" && val < tv) { + retVal = true; + } + else if (testOp == ">" && val > tv) { + retVal = true; + } + else if (testOp == "!=" && val != tv) { + retVal = true; + } + else if ((testOp == "<=" || testOp == "=<") && val <= tv) { + retVal = true; + } + else if ((testOp == ">=" || testOp == "=>") && val >= tv) { + retVal = true; + } + } + bool retVal; + const VariableType & testValue; + std::string testOp; +}; +bool +SqlCheck::performCheck() const +{ + boost::shared_ptr<DB::SelectCommand> query = boost::shared_ptr<DB::SelectCommand>( + db->getWritable().newSelectCommand(sqlCommand.getSqlFor(filter()))); + unsigned int offset = 0; + sqlCommand.bindParams(query.get(), offset); + HandleDoCompare h(testValue, testOp()); + while (query->fetch()) { + (*query)[0].apply(h); + } + return h(); +} + diff --git a/project2/sql/sqlCheck.h b/project2/sql/sqlCheck.h new file mode 100644 index 0000000..c9933d7 --- /dev/null +++ b/project2/sql/sqlCheck.h @@ -0,0 +1,29 @@ +#ifndef SQLCHECK_H +#define SQLCHECK_H + +#include "paramChecker.h" +#include "sqlWriter.h" + +namespace DB { class SelectCommand; } +class RdbmsDataSource; + +/// Project2 component to check the value of a variable against an RDBMS data source +class SqlCheck : public ParamChecker { + public: + SqlCheck(const xmlpp::Element * p); + virtual ~SqlCheck(); + + virtual void loadComplete(const CommonObjects *); + bool performCheck() const; + + const Variable dataSource; + const Variable filter; + const Variable testOp; + const Variable testValue; + + private: + const DynamicSql::SqlCommand sqlCommand; + const RdbmsDataSource * db; +}; + +#endif diff --git a/project2/sql/sqlHandleAsVariableType.cpp b/project2/sql/sqlHandleAsVariableType.cpp new file mode 100644 index 0000000..f084a14 --- /dev/null +++ b/project2/sql/sqlHandleAsVariableType.cpp @@ -0,0 +1,19 @@ +#include "sqlHandleAsVariableType.h" +#include <boost/date_time/posix_time/posix_time.hpp> + +void HandleAsVariableType::null() { + variable = Null(); +} +void HandleAsVariableType::string(const char * c, size_t l) { + variable = Glib::ustring(c, c + l); +} +void HandleAsVariableType::integer(int64_t i) { + variable = i; +} +void HandleAsVariableType::floatingpoint(double d) { + variable = d; +} +void HandleAsVariableType::timestamp(const struct tm & t) { + variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t)); +} + diff --git a/project2/sql/sqlHandleAsVariableType.h b/project2/sql/sqlHandleAsVariableType.h new file mode 100644 index 0000000..c874b7c --- /dev/null +++ b/project2/sql/sqlHandleAsVariableType.h @@ -0,0 +1,18 @@ +#ifndef SQLHANDLEASVARIABLETYPE +#define SQLHANDLEASVARIABLETYPE + +#include "column.h" +#include "variables.h" + +class HandleAsVariableType : public DB::HandleField { + public: + void null(); + void string(const char * c, size_t l); + void integer(int64_t i); + void floatingpoint(double d); + void timestamp(const struct tm & t); + VariableType variable; +}; + +#endif + diff --git a/project2/sql/sqlMergeTask.cpp b/project2/sql/sqlMergeTask.cpp new file mode 100644 index 0000000..09657f8 --- /dev/null +++ b/project2/sql/sqlMergeTask.cpp @@ -0,0 +1,329 @@ +#include "sqlMergeTask.h" +#include "columns.h" +#include "commonObjects.h" +#include "rdbmsDataSource.h" +#include "exceptions.h" +#include "sqlVariableBinder.h" +#include "xmlObjectLoader.h" +#include <misc.h> +#include <stdio.h> +#include <stdexcept> +#include <boost/algorithm/string/join.hpp> +#include <boost/foreach.hpp> +#include <boost/bind.hpp> +#include <libxml++/nodes/textnode.h> + +bool SqlMergeTask::defaultUseTempTable = true; +static void attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert); + +class SqlMergeInsert; +typedef boost::intrusive_ptr<SqlMergeInsert> SqlMergeInsertPtr; +/// Project2 component insert custom constructed records during an SQL Merge task +class SqlMergeInsert : IHaveParameters, public Task { + public: + SqlMergeInsert(const xmlpp::Element * p) : + SourceObject(p), + IHaveParameters(p), + Task(p) { + } + void loadComplete(const CommonObjects*) { + } + void execute() const { + unsigned int col = 0; + BOOST_FOREACH(const Parameters::value_type & v, parameters) { + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(insert, col++), v.second); + } + insert->execute(); + } + private: + friend void attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert); + DB::ModifyCommand * insert; +}; + +DECLARE_LOADER("sqlmerge", SqlMergeTask); +DECLARE_LOADER("sqlmergeinsert", SqlMergeInsert); + +// Conversion logic +SqlMergeTask::SqlMergeTask(const xmlpp::Element * p) : + SourceObject(p), + Task(p), + updateWhere(p, "updatewhere", false), + patchOrder(p->get_attribute_value("patchorder")), + earlyKeys(p->get_attribute_value("earlykeys") == "yes"), + useView(p->get_attribute_value("useview") == "yes"), + tempTableCreated(false), + insCmd(NULL), + destdb(NULL), + dataSource(p, "datasource"), + dtable(p->get_attribute_value("targettable")), + dtablet(stringf("tmp_%s_%d", dtable.c_str(), getpid())) +{ + LoaderBase loader(true); + loader.supportedStorers.insert(Storer::into(&sources)); + loader.collectAll(p, true); + + if (!sources.empty() && useView) { + throw NotSupported("useview not supported with iterate fillers"); + } + + BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column")) { + xmlpp::Element * e = static_cast<xmlpp::Element *>(psi); + TargetColumnPtr tcp(new TargetColumn(e->get_child_text()->get_content())); + tcp->maptable = e->get_attribute_value("maptable"); + if (!tcp->maptable.empty()) { + if (useView) { + throw NotSupported("useview not supported with mapped columns"); + } + tcp->mapcolumn = e->get_attribute_value("mapcolumn"); + } + cols.insert(tcp); + } + BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column[@key='true']")) { + keys.insert(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content()); + } + BOOST_FOREACH(xmlpp::Node * psi, p->find("sql")) { + sqls.push_back(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content()); + } +} + +SqlMergeTask::~SqlMergeTask() +{ + delete insCmd; +} + +void +SqlMergeTask::loadComplete(const CommonObjects * co) +{ + destdb = &co->dataSource<RdbmsDataSource>(dataSource())->getWritable(); + insCmd = insertCommand(); + BOOST_FOREACH(const Sources::value_type & i, sources) { + attach(i, insCmd); + } +} + +SqlMergeTask::TargetColumn::TargetColumn(const Column & c) : + column(c), + mapcolumn(c) +{ +} + +bool +SqlMergeTask::TargetColumn::Sort::operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const +{ + return a->column < b->column; +} + +void +SqlMergeTask::execute() const +{ + createTempTable(); + if (earlyKeys) { + createTempKey(); + copyToTempTable(); + } + else { + copyToTempTable(); + createTempKey(); + } + std::set<std::string> colNames; + BOOST_FOREACH(const TargetColumnPtr & c, cols) { + colNames.insert(c->column); + } + TablePatch tp(*destdb, dtablet, dtable, colNames); + BOOST_FOREACH(const Keys::value_type & k, keys) { + tp.addKey(k); + } + tp.patch(updateWhere(), patchOrder.c_str()); + dropTempTable(); +} + +void +SqlMergeTask::createTempTable() const +{ + if (useView) { + DB::ModifyCommand * cv = destdb->newModifyCommand(stringf( + "CREATE VIEW %s AS %s", + dtablet.c_str(), + boost::algorithm::join(sqls, " UNION ").c_str())); + cv->execute(); + delete cv; + } + else { + DB::ModifyCommand * ctt = destdb->newModifyCommand(stringf( + "CREATE TEMPORARY TABLE %s AS SELECT * FROM %s WHERE 0=1", + dtablet.c_str(), + dtable.c_str())); + ctt->execute(); + delete ctt; + BOOST_FOREACH(Columns::value_type c, cols) { + if (!c->maptable.empty()) { + DB::ModifyCommand * at = destdb->newModifyCommand(stringf( + "ALTER TABLE %s ADD COLUMN %s VARCHAR(1000)", + dtablet.c_str(), + c->mapcolumn.c_str())); + at->execute(); + delete at; + } + } + } + tempTableCreated = true; +} +void +SqlMergeTask::dropTempTable() const +{ + if (tempTableCreated) { + DB::ModifyCommand * d; + if (useView) { + d = destdb->newModifyCommand("DROP VIEW " + dtablet); + } + else { + d = destdb->newModifyCommand("DROP TABLE " + dtablet); + } + d->execute(); + delete d; + } +} +void +SqlMergeTask::createTempKey() const +{ + if (useView) return; + /* Primary key */ + Buffer idx; + idx.appendf("ALTER TABLE %s ADD CONSTRAINT pk_%s PRIMARY KEY(%s)", + dtablet.c_str(), dtablet.c_str(), + boost::algorithm::join(keys, ", ").c_str()); + DB::ModifyCommand * at = destdb->newModifyCommand(idx); + at->execute(); + delete at; + /* Indexes */ + int n = 0; + BOOST_FOREACH(const Keys::value_type & i, indexes) { + DB::ModifyCommand * ci = destdb->newModifyCommand(stringf( + "CREATE INDEX idx_%s_%d ON %s(%s)", + dtablet.c_str(), n, dtablet.c_str(), i.c_str())); + ci->execute(); + delete ci; + n += 1; + } +} +DB::ModifyCommand * +SqlMergeTask::insertCommand() const +{ + Buffer ins; + ins.appendf("INSERT INTO %s(", dtablet.c_str()); + foreach(Columns::const_iterator, cols, c) { + if (c != cols.begin()) { + ins.append(", "); + } + ins.append((*c)->mapcolumn); + } + ins.append(") VALUES ("); + foreach(Columns::const_iterator, cols, c) { + if (c == cols.begin()) { + ins.append("?"); + } + else { + ins.append(", ?"); + } + } + ins.append(")"); + return destdb->newModifyCommand(ins); +} + +class Populate : public NoOutputExecute { + public: + Populate(DB::ModifyCommand * c) : + SourceObject(__FUNCTION__), + NoOutputExecute(__FUNCTION__), + cmd(c) + { + } + virtual void loadComplete(const CommonObjects *) + { + } + void execute() const + { + unsigned int idx = 0; + RowState::Stack().back()->foreachColumn(boost::bind(&Populate::bind, this, boost::ref(idx), _3)); + cmd->execute(); + } + private: + void bind(unsigned int & idx, const VariableType & value) const + { + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, idx++), value); + } + DB::ModifyCommand * cmd; +}; +typedef boost::intrusive_ptr<Populate> PopulatePtr; + +void +attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert) +{ + if (i) { + i->insert = insert; + } +} + +static void +attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert) +{ + if (!i) { + return; + } + if (i->normal.empty()) { + i->normal.push_back(new Populate(insert)); + } + else { + BOOST_FOREACH(const IHaveSubTasks::Tasks::value_type & n, i->normal) { + attach(boost::dynamic_pointer_cast<IHaveSubTasks>(n), insert); + attach(boost::dynamic_pointer_cast<SqlMergeInsert>(n), insert); + } + } +} + +void +SqlMergeTask::copyToTempTable() const +{ + if (useView) return; + BOOST_FOREACH(const Sources::value_type & i, sources) { + i->execute(); + } + BOOST_FOREACH(const std::string & sql, sqls) { + Buffer ins; + ins.appendf("INSERT INTO %s(", dtablet.c_str()); + foreach(Columns::const_iterator, cols, c) { + if (c != cols.begin()) { + ins.append(", "); + } + ins.append((*c)->column); + } + ins.append(") SELECT "); + foreach(Columns::const_iterator, cols, c) { + if (c != cols.begin()) { + ins.append(", "); + } + ins.append((*c)->column); + } + ins.appendf(" FROM (%s) tmp_src", sql.c_str()); + DB::ModifyCommand * cttt = destdb->newModifyCommand(ins); + cttt->execute(); + delete cttt; + } + BOOST_FOREACH(Columns::value_type c, cols) { + if (!c->maptable.empty()) { + DB::ModifyCommand * utt = destdb->newModifyCommand( + stringf( + "UPDATE %s d SET %s = (SELECT m.%s FROM %s m WHERE m.%s = d.%s) WHERE %s IS NULL", + dtablet.c_str(), + c->column.c_str(), + c->column.c_str(), + c->maptable.c_str(), + c->mapcolumn.c_str(), + c->mapcolumn.c_str(), + c->column.c_str())); + utt->execute(); + delete utt; + } + } +} + diff --git a/project2/sql/sqlMergeTask.h b/project2/sql/sqlMergeTask.h new file mode 100644 index 0000000..c9e206c --- /dev/null +++ b/project2/sql/sqlMergeTask.h @@ -0,0 +1,79 @@ +#ifndef SQLMERGETASK_H +#define SQLMERGETASK_H + +#include <connection.h> +#include <modifycommand.h> +#include <selectcommand.h> +#include <buffer.h> +#include "tablepatch.h" +#include "task.h" +#include "iterate.h" +#include "variables.h" +#include <string> +#include <set> +#include <map> +#include <list> + +/// Project2 component merge arbitrary data into an RDBMS table +class SqlMergeTask : public Task { + public: + typedef std::string Table; + typedef std::string Column; + class TargetColumn; + typedef boost::intrusive_ptr<TargetColumn> TargetColumnPtr; + class TargetColumn : public virtual IntrusivePtrBase { + public: + class Sort { + public: + bool operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const; + }; + TargetColumn(const Column &); + + Column column; + Column mapcolumn; + Table maptable; + + }; + typedef std::set<TargetColumnPtr, TargetColumn::Sort> Columns; + + typedef std::set<Column> Keys; + + SqlMergeTask(const xmlpp::Element * p); + virtual ~SqlMergeTask(); + + virtual void loadComplete(const CommonObjects *); + void execute() const; + Columns cols; + Keys keys; + Keys indexes; + const Variable updateWhere; + const std::string patchOrder; + const bool earlyKeys; + const bool useView; + + private: + virtual void copyToTempTable() const; + void createTempTable() const; + void dropTempTable() const; + void createTempKey() const; + + mutable bool tempTableCreated; + typedef ANONSTORAGEOF(Iterate) Sources; + Sources sources; + std::list<std::string> sqls; + protected: + DB::ModifyCommand * insertCommand() const; + DB::ModifyCommand * insCmd; + + public: + const DB::Connection * destdb; + const Variable dataSource; + const Table dtable; + const Table dtablet; + + static unsigned int defaultVerbosity; + static bool defaultUseTempTable; +}; + +#endif + diff --git a/project2/sql/sqlRows.cpp b/project2/sql/sqlRows.cpp new file mode 100644 index 0000000..6e506d7 --- /dev/null +++ b/project2/sql/sqlRows.cpp @@ -0,0 +1,69 @@ +#include "sqlRows.h" +#include "sqlHandleAsVariableType.h" +#include "rowProcessor.h" +#include "xml.h" +#include "selectcommand.h" +#include "rdbmsDataSource.h" +#include "column.h" +#include <string.h> +#include "xmlObjectLoader.h" +#include "commonObjects.h" +#include <boost/date_time/gregorian/gregorian_types.hpp> +#include <boost/foreach.hpp> + +DECLARE_LOADER("sqlrows", SqlRows); + +SqlRows::SqlRows(const xmlpp::Element * p) : + RowSet(p), + dataSource(p, "datasource"), + sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())), + db(NULL) +{ +} + +SqlRows::~SqlRows() +{ +} + +void +SqlRows::loadComplete(const CommonObjects * co) +{ + db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +SqlRows::SqlState::SqlState(SelectPtr s) : + query(s) +{ +} + +const Columns & +SqlRows::SqlState::getColumns() const +{ + if (columns.empty()) { + for (unsigned int c = 0; c < query->columnCount(); c++) { + columns.insert(new Column(c, (*query)[c].name)); + } + } + return columns; +} + +void +SqlRows::execute(const Glib::ustring & filter, const RowProcessor * rp) const +{ + unsigned int offset = 0; + SqlState ss(SelectPtr(db->getReadonly().newSelectCommand(sqlCommand.getSqlFor(filter)))); + sqlCommand.bindParams(ss.query.get(), offset); + while (ss.query->fetch()) { + HandleAsVariableType h; + if (ss.fields.empty()) { + ss.fields.resize(ss.query->columnCount()); + } + for (unsigned int c = 0; c < ss.query->columnCount(); c++) { + const DB::Column & col = (*ss.query)[c]; + col.apply(h); + ss.fields[c] = h.variable; + } + ss.process(rp); + } +} + diff --git a/project2/sql/sqlRows.h b/project2/sql/sqlRows.h new file mode 100644 index 0000000..5614fa1 --- /dev/null +++ b/project2/sql/sqlRows.h @@ -0,0 +1,40 @@ +#ifndef SQLROWS_H +#define SQLROWS_H + +#include <libxml++/nodes/element.h> +#include <boost/intrusive_ptr.hpp> +#include <map> +#include "selectcommand.h" +#include "iHaveParameters.h" +#include "rowSet.h" +#include "sqlWriter.h" + +class RdbmsDataSource; + +/// Project2 component to create a row set based on an SQL SELECT statement issued against an RDBMS data source +class SqlRows : public RowSet { + public: + SqlRows(const xmlpp::Element * p); + ~SqlRows(); + + void execute(const Glib::ustring &, const RowProcessor *) const; + virtual void loadComplete(const CommonObjects *); + + const Variable dataSource; + + private: + const DynamicSql::SqlCommand sqlCommand; + typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; + class SqlState : public RowState { + public: + SqlState(SelectPtr query); + const Columns & getColumns() const; + SelectPtr query; + mutable Columns columns; + friend class SqlRows; + }; + const RdbmsDataSource * db; +}; + +#endif + diff --git a/project2/sql/sqlTask.cpp b/project2/sql/sqlTask.cpp new file mode 100644 index 0000000..3a1d334 --- /dev/null +++ b/project2/sql/sqlTask.cpp @@ -0,0 +1,70 @@ +#include "sqlTask.h" +#include <boost/foreach.hpp> +#include "xmlObjectLoader.h" +#include "modifycommand.h" +#include "rdbmsDataSource.h" +#include "commonObjects.h" +#include "sqlVariableBinder.h" + +DECLARE_LOADER("sqltask", SqlTask); +StaticMessageException(RunOnNotSpecified, "runon attribute must be specified"); + +class SqlIfChangesStorer : public StorerBase<NoOutputExecute, ANONORDEREDSTORAGEOF(NoOutputExecute)> { + public: + SqlIfChangesStorer(Map c, Map nc) : + changes(c), + noChanges(nc) + { + } + bool insert(const xmlpp::Element * p, NoOutputExecutePtr O) { + xmlpp::Attribute * runon = p->get_attribute("runon"); + if (!runon) { + throw RunOnNotSpecified(); + } + ((runon->get_value() == "changes") ? changes : noChanges)->push_back(O); + return true; + } + Map changes, noChanges; +}; + +SqlTask::SqlTask(const xmlpp::Element * p) : + SourceObject(p), + Task(p), + dataSource(p, "datasource"), + filter(p, "filter", false, ""), + sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())) +{ + LoaderBase loader(true); + loader.supportedStorers.insert(new SqlIfChangesStorer(&changesNOEs, &noChangesNOEs)); + loader.collectAll(p, true, IgnoreUnsupported); +} + +SqlTask::~SqlTask() +{ +} + +void +SqlTask::loadComplete(const CommonObjects * co) +{ + db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +void +SqlTask::execute() const +{ + boost::shared_ptr<DB::ModifyCommand> modify = boost::shared_ptr<DB::ModifyCommand>( + db->getWritable().newModifyCommand(sqlCommand.getSqlFor(filter()))); + unsigned int offset = 0; + sqlCommand.bindParams(modify.get(), offset); + if (modify->execute() == 0) { + BOOST_FOREACH(const SubNOEs::value_type & sq, noChangesNOEs) { + sq->execute(); + } + } + else { + BOOST_FOREACH(const SubNOEs::value_type & sq, changesNOEs) { + sq->execute(); + } + } +} + diff --git a/project2/sql/sqlTask.h b/project2/sql/sqlTask.h new file mode 100644 index 0000000..384b000 --- /dev/null +++ b/project2/sql/sqlTask.h @@ -0,0 +1,36 @@ +#ifndef SQLTASK_H +#define SQLTASK_H + +#include <libxml++/nodes/element.h> +#include <boost/intrusive_ptr.hpp> +#include <map> +#include "task.h" +#include "variables.h" +#include "sqlWriter.h" + +namespace DB { class ModifyCommand; } +class RdbmsDataSource; + +/// Project2 component to execute a modifying SQL statement against an RDBMS data source +class SqlTask : public Task { + public: + SqlTask(const xmlpp::Element * p); + virtual ~SqlTask(); + virtual void loadComplete(const CommonObjects *); + virtual void execute() const; + + const Variable dataSource; + const Variable filter; + + typedef ANONORDEREDSTORAGEOF(NoOutputExecute) SubNOEs; + SubNOEs changesNOEs; + SubNOEs noChangesNOEs; + + protected: + const DynamicSql::SqlCommand sqlCommand; + const RdbmsDataSource * db; +}; + +#endif + + diff --git a/project2/sql/sqlVariableBinder.cpp b/project2/sql/sqlVariableBinder.cpp new file mode 100644 index 0000000..23c24f2 --- /dev/null +++ b/project2/sql/sqlVariableBinder.cpp @@ -0,0 +1,77 @@ +#include "sqlVariableBinder.h" +#include "command.h" +#include "variables.h" +#include <boost/date_time/posix_time/conversion.hpp> + +SqlVariableBinder::SqlVariableBinder(DB::Command * c, unsigned int i) : + cmd(c), + idx(i) +{ +} +void +SqlVariableBinder::operator()(const Null &) const +{ + cmd->bindNull(idx); +} +void +SqlVariableBinder::operator()(const Glib::ustring & i) const +{ + cmd->bindParamS(idx, i); +} +void +SqlVariableBinder::operator()(const long long unsigned int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long unsigned int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const unsigned int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const short unsigned int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long long int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const short int & i) const +{ + cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const double & i) const +{ + cmd->bindParamF(idx, i); +} +void +SqlVariableBinder::operator()(const float & i) const +{ + cmd->bindParamF(idx, i); +} +void +SqlVariableBinder::operator()(const boost::posix_time::ptime & i) const +{ + struct tm tm(boost::posix_time::to_tm(i)); + cmd->bindParamT(idx, &tm); +} + diff --git a/project2/sql/sqlVariableBinder.h b/project2/sql/sqlVariableBinder.h new file mode 100644 index 0000000..df3879a --- /dev/null +++ b/project2/sql/sqlVariableBinder.h @@ -0,0 +1,36 @@ +#ifndef SQLVARIABLEBINDER_H +#define SQLVARIABLEBINDER_H + +#include <boost/variant.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/date_time/posix_time/ptime.hpp> +#include <glibmm/ustring.h> + +namespace DB { + class Command; +} +class Null; +class SqlVariableBinder : public boost::static_visitor<> { + public: + SqlVariableBinder(DB::Command * c, unsigned int i); + void operator()(const Null & i) const; + void operator()(const Glib::ustring & i) const; + void operator()(const long long unsigned int & i) const; + void operator()(const long unsigned int & i) const; + void operator()(const unsigned int & i) const; + void operator()(const short unsigned int & i) const; + void operator()(const long long int & i) const; + void operator()(const long int & i) const; + void operator()(const int & i) const; + void operator()(const short int & i) const; + void operator()(const double & i) const; + void operator()(const float & i) const; + void operator()(const boost::posix_time::ptime & i) const; + + private: + DB::Command * cmd; + unsigned int idx; +}; + +#endif + diff --git a/project2/sql/sqlWriter.cpp b/project2/sql/sqlWriter.cpp new file mode 100644 index 0000000..88d9801 --- /dev/null +++ b/project2/sql/sqlWriter.cpp @@ -0,0 +1,131 @@ +#include "sqlWriter.h" +#include <boost/foreach.hpp> +#include "sqlVariableBinder.h" + +DynamicSql::SqlWriter::SqlWriter() +{ +} + +DynamicSql::SqlWriter::~SqlWriter() +{ +} + +DynamicSql::SqlCommand::SqlCommand(const xmlpp::Element * N) +{ + BOOST_FOREACH(xmlpp::Node * n, N->get_children()) { + const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n); + if (t) { + writers.push_back(new SqlText(t)); + } + const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n); + if (e) { + if (e->get_name() == "filter") { + SqlFilterPtr f = new SqlFilter(e); + writers.push_back(f); + filters.insert(Filters::value_type(f->name, f)); + } + else if (e->get_name() == "param") { + writers.push_back(new SqlParameter(e)); + } + } + } +} + +Glib::ustring +DynamicSql::SqlCommand::getSqlFor(const Glib::ustring & f) const +{ + BOOST_FOREACH(const SqlCommand::Filters::value_type & filter, filters) { + filter.second->active = (filter.second->name == f); + } + Glib::ustring sql; + writeSql(sql); + return sql; +} + +void +DynamicSql::SqlCommand::writeSql(Glib::ustring & sql) const +{ + BOOST_FOREACH(const SqlWriterPtr & w, writers) { + w->writeSql(sql); + } +} + +void +DynamicSql::SqlCommand::bindParams(DB::Command * cmd, unsigned int & offset) const +{ + BOOST_FOREACH(const SqlWriterPtr & w, writers) { + w->bindParams(cmd, offset); + } +} + +DynamicSql::SqlFilter::SqlFilter(const xmlpp::Element * N) : + name(N->get_attribute_value("name")), + active(false) +{ + BOOST_FOREACH(xmlpp::Node * n, N->get_children()) { + const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n); + if (t) { + writers.push_back(new SqlText(t)); + } + const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n); + if (e) { + if (e->get_name() == "param") { + writers.push_back(new SqlParameter(e)); + } + } + } +} + +void +DynamicSql::SqlFilter::writeSql(Glib::ustring & sql) const +{ + if (active) { + BOOST_FOREACH(const SqlWriterPtr & w, writers) { + w->writeSql(sql); + } + } +} + +void +DynamicSql::SqlFilter::bindParams(DB::Command * cmd, unsigned int & offset) const +{ + if (active) { + BOOST_FOREACH(const SqlWriterPtr & w, writers) { + w->bindParams(cmd, offset); + } + } +} + +DynamicSql::SqlParameter::SqlParameter(const xmlpp::Element * n) : + Variable(n, boost::optional<Glib::ustring>("local")) +{ +} + +void +DynamicSql::SqlParameter::writeSql(Glib::ustring & sql) const +{ + sql.append("?"); +} + +void +DynamicSql::SqlParameter::bindParams(DB::Command * cmd, unsigned int & offset) const +{ + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, offset++), (*this)); +} + +DynamicSql::SqlText::SqlText(const xmlpp::TextNode * n) : + text(n->get_content()) +{ +} + +void +DynamicSql::SqlText::writeSql(Glib::ustring & sql) const +{ + sql.append(text); +} + +void +DynamicSql::SqlText::bindParams(DB::Command *, unsigned int &) const +{ +} + diff --git a/project2/sql/sqlWriter.h b/project2/sql/sqlWriter.h new file mode 100644 index 0000000..7693e19 --- /dev/null +++ b/project2/sql/sqlWriter.h @@ -0,0 +1,63 @@ +#ifndef SQLWRITER_H +#define SQLWRITER_H + +#include <intrusivePtrBase.h> +#include <libxml++/nodes/textnode.h> +#include <command.h> +#include <glibmm/ustring.h> +#include <list> +#include <map> +#include "variables.h" + +namespace DynamicSql { + class SqlWriter; + typedef boost::intrusive_ptr<SqlWriter> SqlWriterPtr; + typedef std::list<SqlWriterPtr> Writers; + class SqlWriter : public IntrusivePtrBase { + public: + SqlWriter(); + virtual ~SqlWriter(); + virtual void writeSql(Glib::ustring & sql) const = 0; + virtual void bindParams(DB::Command *, unsigned int & offset) const = 0; + }; + class SqlText : public SqlWriter { + public: + SqlText(const xmlpp::TextNode *); + virtual void writeSql(Glib::ustring & sql) const; + virtual void bindParams(DB::Command *, unsigned int & offset) const; + + const Glib::ustring text; + }; + class SqlParameter : public SqlWriter, Variable { + public: + SqlParameter(const xmlpp::Element *); + virtual void writeSql(Glib::ustring & sql) const; + virtual void bindParams(DB::Command *, unsigned int & offset) const; + }; + class SqlFilter : public SqlWriter { + public: + SqlFilter(const xmlpp::Element *); + virtual void writeSql(Glib::ustring & sql) const; + virtual void bindParams(DB::Command *, unsigned int & offset) const; + + const Glib::ustring name; + bool active; + private: + Writers writers; + }; + typedef boost::intrusive_ptr<SqlFilter> SqlFilterPtr; + class SqlCommand : public SqlWriter { + public: + SqlCommand(const xmlpp::Element *); + virtual void writeSql(Glib::ustring & sql) const; + virtual void bindParams(DB::Command *, unsigned int & offset) const; + typedef std::multimap<Glib::ustring, SqlFilterPtr> Filters; + Glib::ustring getSqlFor(const Glib::ustring & f) const; + private: + Filters filters; + Writers writers; + }; +} + +#endif + diff --git a/project2/sql/tablepatch.cpp b/project2/sql/tablepatch.cpp new file mode 100644 index 0000000..f87f60e --- /dev/null +++ b/project2/sql/tablepatch.cpp @@ -0,0 +1,438 @@ +#include "tablepatch.h" +#include <stdio.h> +#include <misc.h> +#include <selectcommand.h> +#include <column.h> +#include <buffer.h> +#include <boost/algorithm/string/join.hpp> + +using namespace DB; + +TablePatch::TablePatch(const Connection & wdb, const TablePatch::Table & s, const TablePatch::Table & d, + const TablePatch::Columns & c) : + src(s), + dest(d), + cols(c), + db(wdb) +{ + if (!src.length()) { + throw PatchCheckFailure(); + } + if (!dest.length()) { + throw PatchCheckFailure(); + } + if (!db.inTx()) { + throw PatchCheckFailure(); + } +} + +void +TablePatch::addKey(const TablePatch::Column & c) +{ + pk.insert(c); +} + +void +TablePatch::patch(const char * where, const char * order) +{ + if (pk.size() == 0) { + throw PatchCheckFailure(); + } + doDeletes(where, order); + doUpdates(where, order); + doInserts(order); +} + +void +TablePatch::doDeletes(const char * where, const char * order) +{ + switch (db.bulkDeleteStyle()) { + case BulkDeleteUsingSubSelect: + { + // ----------------------------------------------------------------- + // Build SQL to delete keys ---------------------------------------- + // ----------------------------------------------------------------- + Buffer toDelSql; + toDelSql.appendf("DELETE FROM %s WHERE (", + dest.c_str()); + foreach (PKI, pk, pki) { + if (pki != pk.begin()) { + toDelSql.append(", "); + } + toDelSql.appendf("%s.%s", + dest.c_str(), + pki->c_str()); + } + // ----------------------------------------------------------------- + // Build SQL to select keys to delete ------------------------------ + // ----------------------------------------------------------------- + toDelSql.append(") IN (SELECT "); + foreach (PKI, pk, pki) { + if (pki != pk.begin()) { + toDelSql.append(", "); + } + toDelSql.appendf("a.%s", + pki->c_str()); + } + toDelSql.appendf(" FROM %s a LEFT OUTER JOIN %s b ON ", + dest.c_str(), src.c_str()); + foreach (PKI, pk, pki) { + if (pki != pk.begin()) { + toDelSql.append(" AND "); + } + toDelSql.appendf(" a.%s = b.%s", + pki->c_str(), pki->c_str()); + } + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + toDelSql.append(" WHERE "); + } + else { + toDelSql.append(" AND "); + } + toDelSql.appendf(" b.%s IS NULL", + pki->c_str()); + } + if (where && *where) { + toDelSql.appendf(" AND %s", where); + } + if (order && *order) { + toDelSql.appendf(" ORDER BY %s", order); + } + toDelSql.append(")"); + ModifyCommand * del = db.newModifyCommand(toDelSql); + del->execute(); + delete del; + break; + } + case BulkDeleteUsingUsingAlias: + case BulkDeleteUsingUsing: + { + Buffer toDelSql; + toDelSql.appendf("DELETE FROM %s USING %s a LEFT OUTER JOIN %s b ", + (db.bulkDeleteStyle() == BulkDeleteUsingUsingAlias ? "a" : dest.c_str()), + dest.c_str(), src.c_str()); + foreach (PKI, pk, pki) { + if (pki != pk.begin()) { + toDelSql.append(" AND "); + } + else { + toDelSql.append(" ON "); + } + toDelSql.appendf(" a.%s = b.%s", + pki->c_str(), pki->c_str()); + } + foreach (PKI, pk, pki) { + if (pki != pk.begin()) { + toDelSql.append(" AND "); + } + else { + toDelSql.append(" WHERE "); + } + toDelSql.appendf(" b.%s IS NULL", + pki->c_str()); + } + if (where && *where) { + toDelSql.appendf(" AND %s", where); + } + if (order && *order) { + toDelSql.appendf(" ORDER BY %s", order); + } + ModifyCommand * del = db.newModifyCommand(toDelSql); + del->execute(); + delete del; + break; + } + } +} + +void +TablePatch::doUpdates(const char * where, const char * order) +{ + if (cols.size() == pk.size()) { + // Can't "change" anything... it's all part of the key + return; + } + switch (db.bulkUpdateStyle()) { + case BulkUpdateByIteration: + { + // ----------------------------------------------------------------- + // Build SQL for list of updates to perform ------------------------ + // ----------------------------------------------------------------- + Buffer toUpdSel; + toUpdSel.append("SELECT "); + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + toUpdSel.appendf("b.%s, ", + col->c_str()); + } + } + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) != pk.end()) { + toUpdSel.appendf("b.%s, ", + col->c_str()); + } + } + toUpdSel.appendf("0 FROM %s a, %s b", + dest.c_str(), src.c_str()); + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + toUpdSel.append(" WHERE "); + } + else { + toUpdSel.append(" AND "); + } + toUpdSel.appendf(" a.%s = b.%s", + pki->c_str(), pki->c_str()); + } + if (where && *where) { + toUpdSel.appendf(" AND %s", where); + } + toUpdSel.append(" AND ("); + bool first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + toUpdSel.append(" OR "); + } + first = false; + toUpdSel.appendf( + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + col->c_str(), col->c_str(), col->c_str(), col->c_str()); + } + } + toUpdSel.append(")"); + if (order && *order) { + toUpdSel.appendf(" ORDER BY %s", order); + } + // ----------------------------------------------------------------- + // Build SQL to perform updates ------------------------------------ + // ----------------------------------------------------------------- + Buffer updSql; + updSql.appendf("UPDATE %s SET ", + dest.c_str()); + first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + updSql.append(", "); + } + first = false; + updSql.appendf(" %s = ?", + col->c_str()); + } + } + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + updSql.append(" WHERE "); + } + else { + updSql.append(" AND "); + } + updSql.appendf(" %s = ?", + pki->c_str()); + } + // ----------------------------------------------------------------- + // Iterator over update list make changes -------------------------- + // ----------------------------------------------------------------- + SelectCommand * toUpd = db.newSelectCommand(toUpdSel); + ModifyCommand * upd = db.newModifyCommand(updSql); + int cs = cols.size(); + toUpd->execute(); + for (int c = 0; c < cs; c += 1) { + (*toUpd)[c].rebind(upd, c); + } + while (toUpd->fetch()) { + upd->execute(false); + } + delete toUpd; + delete upd; + } + break; + case BulkUpdateUsingFromSrc: + { + // ----------------------------------------------------------------- + // Build SQL for list of updates to perform ------------------------ + // ----------------------------------------------------------------- + Buffer updSql; + updSql.appendf("UPDATE %s a SET ", + dest.c_str()); + bool first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + updSql.append(", "); + } + first = false; + updSql.appendf(" %s = b.%s ", + col->c_str(), col->c_str()); + } + } + updSql.appendf(" FROM %s b ", + src.c_str()); + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + updSql.append(" WHERE "); + } + else { + updSql.append(" AND "); + } + updSql.appendf(" a.%s = b.%s ", + pki->c_str(), pki->c_str()); + } + updSql.append(" AND ("); + first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + updSql.append(" OR "); + } + first = false; + updSql.appendf( + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + col->c_str(), col->c_str(), + col->c_str(), col->c_str()); + } + } + updSql.append(")"); + if (where && *where) { + updSql.appendf(" AND %s ", where); + } + if (order && *order) { + updSql.appendf(" ORDER BY %s", order); + } + // ----------------------------------------------------------------- + // Execute the bulk update command --------------------------------- + // ----------------------------------------------------------------- + ModifyCommand * upd = db.newModifyCommand(updSql); + upd->execute(true); + delete upd; + break; + } + case BulkUpdateUsingJoin: + { + // ----------------------------------------------------------------- + // Build SQL for list of updates to perform ------------------------ + // ----------------------------------------------------------------- + Buffer updSql; + updSql.appendf("UPDATE %s a, %s b SET ", + dest.c_str(), src.c_str()); + bool first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + updSql.append(", "); + } + first = false; + updSql.appendf(" a.%s = b.%s ", + col->c_str(), col->c_str()); + } + } + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + updSql.append(" WHERE "); + } + else { + updSql.append(" AND "); + } + updSql.appendf(" a.%s = b.%s ", + pki->c_str(), pki->c_str()); + } + updSql.append(" AND ("); + first = true; + foreach (Columns::const_iterator, cols, col) { + if (pk.find(*col) == pk.end()) { + if (!first) { + updSql.append(" OR "); + } + first = false; + updSql.appendf( + " (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ + + (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", + col->c_str(), col->c_str(), + col->c_str(), col->c_str()); + } + } + updSql.append(")"); + if (where && *where) { + updSql.appendf(" AND %s ", where); + } + if (order && *order) { + updSql.appendf(" ORDER BY %s", order); + } + // ----------------------------------------------------------------- + // Execute the bulk update command --------------------------------- + // ----------------------------------------------------------------- + ModifyCommand * upd = db.newModifyCommand(updSql); + upd->execute(true); + delete upd; + break; + } + } +} + +void +TablePatch::doInserts(const char * order) +{ + // ----------------------------------------------------------------- + // Build SQL for copying new records ------------------------------- + // ----------------------------------------------------------------- + Buffer toInsSql; + toInsSql.appendf("INSERT INTO %s", + dest.c_str()); + foreach (Columns::const_iterator, cols, col) { + if (col == cols.begin()) { + toInsSql.append("("); + } + else { + toInsSql.append(", "); + } + toInsSql.appendf("%s", + col->c_str()); + } + toInsSql.append(") SELECT "); + foreach (Columns::const_iterator, cols, col) { + if (col != cols.begin()) { + toInsSql.append(", "); + } + toInsSql.appendf("b.%s", + col->c_str()); + } + toInsSql.appendf(" FROM %s b LEFT OUTER JOIN %s a", + src.c_str(), dest.c_str()); + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + toInsSql.append(" ON "); + } + else { + toInsSql.append(" AND "); + } + toInsSql.appendf(" a.%s = b.%s", + pki->c_str(), pki->c_str()); + } + foreach (PKI, pk, pki) { + if (pki == pk.begin()) { + toInsSql.append(" WHERE "); + } + else { + toInsSql.append(" AND "); + } + toInsSql.appendf(" a.%s IS NULL", + pki->c_str()); + } + if (order && *order) { + toInsSql.appendf(" ORDER BY %s", order); + } + ModifyCommand * ins = db.newModifyCommand(toInsSql); + ins->execute(); + delete ins; +} + +const char * +TablePatch::PatchCheckFailure::what() const throw() +{ + return "Santiy checks failed: check table names and keys"; +} + diff --git a/project2/sql/tablepatch.h b/project2/sql/tablepatch.h new file mode 100644 index 0000000..0174294 --- /dev/null +++ b/project2/sql/tablepatch.h @@ -0,0 +1,42 @@ +#ifndef TABLEPATCH_H +#define TABLEPATCH_H + +#include <string> +#include <set> +#include <map> +#include <connection.h> +#include <modifycommand.h> +#include <selectcommand.h> + +class TablePatch { + public: + typedef std::string Table; + typedef std::string Column; + typedef std::set<Column> Columns; + typedef Columns PrimaryKey; + typedef PrimaryKey::const_iterator PKI; + + class PatchCheckFailure : public std::exception { + public: + const char * what() const throw(); + }; + + TablePatch(const DB::Connection & db, const Table & src, const Table & dest, const Columns & cols); + + void addKey(const Column & col); + void patch(const char * where, const char * order); + + private: + void doDeletes(const char * where, const char * order); + void doUpdates(const char * where, const char * order); + void doInserts(const char * order); + + Table src; + Table dest; + PrimaryKey pk; + Columns cols; + const DB::Connection &db; +}; + +#endif + |