diff options
Diffstat (limited to 'project2/sql')
| -rw-r--r-- | project2/sql/Jamfile.jam | 38 | ||||
| -rw-r--r-- | project2/sql/connectionLoader.h | 19 | ||||
| -rw-r--r-- | project2/sql/rdbmsDataSource.cpp | 220 | ||||
| -rw-r--r-- | project2/sql/rdbmsDataSource.h | 78 | ||||
| -rw-r--r-- | project2/sql/sql-modODBC.cpp | 4 | ||||
| -rw-r--r-- | project2/sql/sql-modPQ.cpp | 4 | ||||
| -rw-r--r-- | project2/sql/sqlCache.cpp | 300 | ||||
| -rw-r--r-- | project2/sql/sqlCheck.cpp | 100 | ||||
| -rw-r--r-- | project2/sql/sqlCheck.h | 29 | ||||
| -rw-r--r-- | project2/sql/sqlHandleAsVariableType.cpp | 19 | ||||
| -rw-r--r-- | project2/sql/sqlHandleAsVariableType.h | 18 | ||||
| -rw-r--r-- | project2/sql/sqlMergeTask.cpp | 329 | ||||
| -rw-r--r-- | project2/sql/sqlMergeTask.h | 79 | ||||
| -rw-r--r-- | project2/sql/sqlRows.cpp | 69 | ||||
| -rw-r--r-- | project2/sql/sqlRows.h | 40 | ||||
| -rw-r--r-- | project2/sql/sqlTask.cpp | 70 | ||||
| -rw-r--r-- | project2/sql/sqlTask.h | 36 | ||||
| -rw-r--r-- | project2/sql/sqlVariableBinder.cpp | 77 | ||||
| -rw-r--r-- | project2/sql/sqlVariableBinder.h | 36 | ||||
| -rw-r--r-- | project2/sql/sqlWriter.cpp | 131 | ||||
| -rw-r--r-- | project2/sql/sqlWriter.h | 63 | ||||
| -rw-r--r-- | project2/sql/tablepatch.cpp | 438 | ||||
| -rw-r--r-- | project2/sql/tablepatch.h | 42 | 
23 files changed, 2239 insertions, 0 deletions
| diff --git a/project2/sql/Jamfile.jam b/project2/sql/Jamfile.jam new file mode 100644 index 0000000..b05fd34 --- /dev/null +++ b/project2/sql/Jamfile.jam @@ -0,0 +1,38 @@ +alias libxmlpp : : : : +	<cflags>"`pkg-config --cflags libxml++-2.6`" +	<linkflags>"`pkg-config --libs libxml++-2.6`" ; + +explicit object sql-modODBC ; +obj sql-modODBC : +	sql-modODBC.cpp : +	<library>../../libodbcpp//odbcpp +	<library>libxmlpp +	<include>../../libmisc +	<include>../common +	: : +	<library>../../libodbcpp//odbcpp +	; +	 +explicit object sql-modPQ ; +obj sql-modPQ : +	sql-modPQ.cpp : +	<library>../../libpqpp//pqpp +	<library>libxmlpp +	<include>../../libmisc +	<include>../common +	: : +	<library>../../libpqpp//pqpp +	; +	 +lib p2sql : +	sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp +	sqlHandleAsVariableType.cpp +	../../libdbpp//dbpp +	: +	<odbc>yes:<library>sql-modODBC +	<pq>yes:<library>sql-modPQ +	<library>libxmlpp +	<library>../common//p2common +	<include>../../libmisc +	; + diff --git a/project2/sql/connectionLoader.h b/project2/sql/connectionLoader.h new file mode 100644 index 0000000..841b425 --- /dev/null +++ b/project2/sql/connectionLoader.h @@ -0,0 +1,19 @@ +#include "xmlObjectLoader.h" +#include "../libdbpp/connection.h" + +/// Base class to implement DB connection type modules +class ConnectionLoader : public ComponentLoader { +	public: +		virtual DB::Connection * connect(const std::string & dsn) const = 0; +}; + +/// Helper implemention for specific DB types +template <class DBType> +class ConnectionLoaderImpl : public ConnectionLoader { +	public: +		virtual DB::Connection * connect(const std::string & dsn) const +		{ +			return new DBType(dsn); +		} +}; + diff --git a/project2/sql/rdbmsDataSource.cpp b/project2/sql/rdbmsDataSource.cpp new file mode 100644 index 0000000..dac6cb1 --- /dev/null +++ b/project2/sql/rdbmsDataSource.cpp @@ -0,0 +1,220 @@ +#include "rdbmsDataSource.h" +#include "connectionLoader.h" +#include <libxml++/nodes/textnode.h> +#include <sys/utsname.h> +#include "logger.h" +#include <errno.h> +#include <boost/foreach.hpp> + +SimpleMessageException(UnknownConnectionProvider); + +/// Specialized ElementLoader for instances of RdbmsDataSource; handles persistent DB connections +class RdbmsDataSourceLoader : public ElementLoaderImpl<RdbmsDataSource> { +	public: +		void onIdle() +		{ +			// Disconnect all cached database connections +			RdbmsDataSource::dbhosts.clear(); +		} +		static bool isConnectionExpired(const RdbmsDataSource::DBHosts::value_type & con) +		{ +			return con.second->isExpired(); +		} +		void onPeriodic() +		{ +			// Disconnect expired database connections +			RdbmsDataSource::DBHosts::iterator i; +			while ((i = std::find_if(RdbmsDataSource::dbhosts.begin(), RdbmsDataSource::dbhosts.end(), isConnectionExpired)) != RdbmsDataSource::dbhosts.end()) { +				RdbmsDataSource::dbhosts.erase(i); +			} +		} +		void onIteration() +		{ +			RdbmsDataSource::changedDSNs.clear(); +		} +}; +DECLARE_CUSTOM_LOADER("rdbmsdatasource", RdbmsDataSourceLoader); + +RdbmsDataSource::DBHosts RdbmsDataSource::dbhosts; +RdbmsDataSource::FailedHosts RdbmsDataSource::failedhosts; +RdbmsDataSource::DSNSet RdbmsDataSource::changedDSNs; + +RdbmsDataSource::RdbmsDataSource(const xmlpp::Element * p) : +	DataSource(p), +	masterDsn(dynamic_cast<const xmlpp::Element *>(p->find("masterdsn").front())), +	preferLocal(p->get_attribute_value("preferlocal") != "false") +{ +	BOOST_FOREACH(const xmlpp::Node * node, p->find("readonly/dsn")) { +		const xmlpp::Element * elem = dynamic_cast<const xmlpp::Element *>(node); +		if (elem) { +			roDSNs.insert(ReadonlyDSNs::value_type(elem->get_attribute_value("host"), elem)); +		} +	} +} + +RdbmsDataSource::~RdbmsDataSource() +{ +} + +void +RdbmsDataSource::loadComplete(const CommonObjects *) +{ +} + +const DB::Connection & +RdbmsDataSource::getWritable() const +{ +	ConnectionPtr master = connectTo(masterDsn); +	if (!master->txOpen) { +		master->connection->beginTx(); +		master->txOpen = true; +	} +	changedDSNs.insert(name); +	return *master->connection; +} + +const DB::Connection & +RdbmsDataSource::getReadonly() const +{ +	if (changedDSNs.find(name) != changedDSNs.end()) { +		return *connectTo(masterDsn)->connection; +	} +	if (localhost.length() == 0 && preferLocal) { +		struct utsname name; +		if (uname(&name)) { +			Logger()->messagef(LOG_WARNING, "%s: Unable to determine local host name (%d:%s)", +					__PRETTY_FUNCTION__, errno, strerror(errno)); +			localhost = "unknown"; +		} +		else { +			localhost = name.nodename; +		} +	} +	if (preferLocal) { +		ReadonlyDSNs::const_iterator ro = roDSNs.find(localhost); +		try { +			if (ro == roDSNs.end()) { +				Logger()->messagef(LOG_INFO, "%s: No database host matches local host name (%s) Will use master DSN", +						__PRETTY_FUNCTION__, localhost.c_str()); +				return *connectTo(masterDsn)->connection; +			} +			return *connectTo(ro->second)->connection; +		} +		catch (...) { +			// Failed to connect to a preferred DB... carry on and try the others... +		} +	} +	BOOST_FOREACH(ReadonlyDSNs::value_type db, roDSNs) { +		try { +			return *connectTo(db.second)->connection; +		} +		catch (...) { +		} +	} +	return *connectTo(masterDsn)->connection; +} + +void +RdbmsDataSource::commit() +{ +	DBHosts::const_iterator m = dbhosts.find(masterDsn); +	if (m != dbhosts.end() && m->second->txOpen) { +		m->second->connection->commitTx(); +		m->second->txOpen = false; +	} +} + +void +RdbmsDataSource::rollback() +{ +	DBHosts::const_iterator m = dbhosts.find(masterDsn); +	if (m != dbhosts.end() && m->second->txOpen) { +		m->second->connection->rollbackTx(); +		m->second->txOpen = false; +	} +	changedDSNs.erase(name); +} + +RdbmsDataSource::ConnectionPtr +RdbmsDataSource::connectTo(const ConnectionInfo & dsn) +{ +	FailedHosts::iterator dbf = failedhosts.find(dsn); +	if (dbf != failedhosts.end()) { +		if (time(NULL) - 20 > dbf->second.FailureTime) { +			failedhosts.erase(dbf); +		} +		else { +			throw dbf->second; +		} +	} +	DBHosts::const_iterator dbi = dbhosts.find(dsn); +	if (dbi != dbhosts.end()) { +		try { +			dbi->second->connection->ping(); +			dbi->second->touch(); +			return dbi->second; +		} +		catch (...) { +			// Connection in failed state +			Logger()->messagef(LOG_DEBUG, "%s: Cached connection failed", __PRETTY_FUNCTION__); +		} +	} +	 +	try { +		ConnectionPtr db = ConnectionPtr(new RdbmsConnection(dsn.connect(), 300)); +		dbhosts[dsn] = db; +		db->touch(); +		return db; +	} +	catch (const DB::ConnectionError & e) { +		failedhosts.insert(FailedHosts::value_type(dsn, e)); +		throw; +	} +} + +RdbmsDataSource::RdbmsConnection::RdbmsConnection(const DB::Connection * con, time_t kat) : +	connection(con), +	txOpen(false), +	lastUsedTime(0), +	keepAliveTime(kat) +{ +} + +RdbmsDataSource::RdbmsConnection::~RdbmsConnection() +{ +	connection->finish(); +	delete connection; +} + +void +RdbmsDataSource::RdbmsConnection::touch() const +{ +	time(&lastUsedTime); +} + +bool +RdbmsDataSource::RdbmsConnection::isExpired() const +{ +	return (time(NULL) > lastUsedTime + keepAliveTime); +} + +RdbmsDataSource::ConnectionInfo::ConnectionInfo(const xmlpp::Element * n) +{ +	BOOST_FOREACH(const xmlpp::Node * node, n->get_children()) { +		typeId = LoaderBase::getLoader<ConnectionLoader, UnknownConnectionProvider>(node->get_name()); +		dsn = dynamic_cast<const xmlpp::Element *>(node)->get_child_text()->get_content(); +	} +} + +DB::Connection * +RdbmsDataSource::ConnectionInfo::connect() const +{ +	return typeId->connect(dsn); +} + +bool +RdbmsDataSource::ConnectionInfo::operator<(const RdbmsDataSource::ConnectionInfo & other) const +{ +	return ((typeId < other.typeId) || ((typeId == other.typeId) && (dsn < other.dsn))); +} + diff --git a/project2/sql/rdbmsDataSource.h b/project2/sql/rdbmsDataSource.h new file mode 100644 index 0000000..fdcfbe7 --- /dev/null +++ b/project2/sql/rdbmsDataSource.h @@ -0,0 +1,78 @@ +#ifndef RDBMSDATASOURCE_H +#define RDBMSDATASOURCE_H + +#include <libxml/tree.h> +#include <boost/shared_ptr.hpp> +#include <map> +#include <set> +#include "dataSource.h" +#include "../libdbpp/connection.h" +#include "../libdbpp/error.h" +#include "xmlObjectLoader.h" + +class ConnectionLoader; + +/// Project2 component to provide access to transactional RDBMS data sources +class RdbmsDataSource : public DataSource { +	public: +		class RdbmsConnection { +			public: +				RdbmsConnection(const DB::Connection * connection, time_t kat); +				~RdbmsConnection(); + +				void touch() const; +				bool isExpired() const; +				const DB::Connection * const connection; +				bool txOpen; + +			private: +				mutable time_t lastUsedTime; +				const time_t keepAliveTime; +		}; + +		class ConnectionInfo { +			public: +				ConnectionInfo(const xmlpp::Element *); + +				DB::Connection * connect() const; + +				bool operator<(const ConnectionInfo & o) const; + +			private: +				std::string dsn; +				boost::shared_ptr<ConnectionLoader> typeId; +		}; + +		typedef boost::shared_ptr<RdbmsConnection> ConnectionPtr; +		typedef std::map<std::string, ConnectionInfo> ReadonlyDSNs; // Map hostname to DSN string +		typedef std::map<ConnectionInfo, ConnectionPtr> DBHosts; // Map DSN strings to connections +		typedef std::map<ConnectionInfo, const DB::ConnectionError> FailedHosts; // Map DSN strings to failures + +		RdbmsDataSource(const xmlpp::Element * p); +		~RdbmsDataSource(); + +		const DB::Connection & getReadonly() const; +		const DB::Connection & getWritable() const; +		virtual void loadComplete(const CommonObjects *); +		virtual void commit(); +		virtual void rollback(); + +		const ConnectionInfo masterDsn; +		const bool preferLocal; + +	protected: +		static ConnectionPtr connectTo(const ConnectionInfo & dsn); +		ReadonlyDSNs roDSNs; + +	private: +		mutable std::string localhost; +		static DBHosts dbhosts; +		static FailedHosts failedhosts; +		typedef std::set<std::string> DSNSet; +		static DSNSet changedDSNs; + +		friend class RdbmsDataSourceLoader; +}; + +#endif + diff --git a/project2/sql/sql-modODBC.cpp b/project2/sql/sql-modODBC.cpp new file mode 100644 index 0000000..1703cb6 --- /dev/null +++ b/project2/sql/sql-modODBC.cpp @@ -0,0 +1,4 @@ +#include "connectionLoader.h" +#include "../libodbcpp/connection.h" +typedef ODBC::Connection ODBCConnection; +DECLARE_COMPONENT_LOADER("odbc", ODBCConnection, ConnectionLoader) diff --git a/project2/sql/sql-modPQ.cpp b/project2/sql/sql-modPQ.cpp new file mode 100644 index 0000000..5991754 --- /dev/null +++ b/project2/sql/sql-modPQ.cpp @@ -0,0 +1,4 @@ +#include "connectionLoader.h" +#include "../libpqpp/connection.h" +typedef PQ::Connection PQConnection; +DECLARE_COMPONENT_LOADER("postgresql", PQConnection, ConnectionLoader) diff --git a/project2/sql/sqlCache.cpp b/project2/sql/sqlCache.cpp new file mode 100644 index 0000000..13bc23d --- /dev/null +++ b/project2/sql/sqlCache.cpp @@ -0,0 +1,300 @@ +#include "cache.h" +#include "sqlVariableBinder.h" +#include "sqlHandleAsVariableType.h" +#include "buffer.h" +#include "selectcommand.h" +#include "modifycommand.h" +#include "column.h" +#include "commonObjects.h" +#include "rdbmsDataSource.h" +#include "logger.h" +#include "xmlObjectLoader.h" +#include "iHaveParameters.h" +#include "rowSet.h" +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <boost/algorithm/string/predicate.hpp> + +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; +typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr; + +class SqlCache : public Cache { +	public: +		SqlCache(const xmlpp::Element * p) : +			Cache(p) +		{ +		} + +		void loadComplete(const CommonObjects * co) +		{ +			db = co->dataSource<RdbmsDataSource>(DataSource); +		} + +		static void appendKeyCols(Buffer * sql, unsigned int * off, const Glib::ustring & col) +		{ +			if ((*off)++) { +				sql->append(", "); +			} +			sql->append(col.c_str()); +		} + +		static void appendKeyBinds(Buffer * sql, unsigned int * off) +		{ +			if ((*off)++) { +				sql->append(", "); +			} +			sql->append("?"); +		} + +		static void appendKeyAnds(Buffer * sql, const Glib::ustring & col) +		{ +			sql->appendf(" AND h.%s = ?", col.c_str()); +		} + +		static void bindKeyValues(DB::Command * cmd, unsigned int * offset, const VariableType & v) +		{ +			boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, (*offset)++), v); +		} + +		class SqlCacheRowSet : public RowSet { +			public: +				SqlCacheRowSet(SelectPtr r) : +					RowSet(NULL), +					s(r) { +					} +				void loadComplete(const CommonObjects *) { +				} +				class SqlCacheRowState : public RowState { +					public: +						SqlCacheRowState(const SqlCacheRowSet * s) : +							sc(s) { +							} +						const Columns & getColumns() const { +							return columns; +						} +						RowAttribute resolveAttr(const Glib::ustring & attrName) const { +							return boost::bind(&SqlCacheRowState::getAttrCol, this, "p2attr_" + attrName); +						} +					private: +						VariableType getAttrCol(const Glib::ustring & col) const { +							HandleAsVariableType h; +							(*sc->s)[col].apply(h); +							return h.variable; +						} +						mutable Columns columns; +						friend class SqlCacheRowSet; +						const SqlCacheRowSet * sc; +				}; +				void execute(const Glib::ustring&, const RowProcessor * rp) const { +					SqlCacheRowState ss(this); +					HandleAsVariableType h; +					do { +						if (!(*s)["p2_cacheid"].isNull()) { +							if (colCols.empty()) { +								(*s)["p2_cacheid"].apply(h); +								cacheId = h.variable; +								unsigned int colNo = 0; +								for (unsigned int c = 0; c < s->columnCount(); c++) { +									const DB::Column & col = (*s)[c]; +									if (!boost::algorithm::starts_with(col.name, "p2attr_") && +											!boost::algorithm::starts_with(col.name, "p2_")) { +										ss.columns.insert(new Column(colNo++, col.name)); +										colCols.push_back(c); +									} +								} +								ss.fields.resize(colCols.size()); +							} +							else { +								(*s)["p2_cacheid"].apply(h); +								if (cacheId != (int64_t)h.variable) { +									break; +								} +							} +							BOOST_FOREACH(const unsigned int & c, colCols) { +								const DB::Column & col = (*s)[c]; +								col.apply(h); +								ss.fields[&c - &colCols.front()] = h.variable; +							} +							ss.process(rp); +						} +					} while (s->fetch()); +				} +			private: +				SelectPtr s; +				mutable std::vector<unsigned int> colCols; +				mutable int64_t cacheId; +		}; + +		RowSetCPtr getCachedRowSet(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const +		{ +			Buffer sql; +			sql.appendf("SELECT r.* \ +					FROM %s p, %s_%s_%s h LEFT OUTER JOIN %s_%s_%s_rows r \ +						ON h.p2_cacheid = r.p2_cacheid \ +					WHERE p.p2_time > ? \ +					AND p.p2_cacheid = h.p2_cacheid", +					HeaderTable.c_str(), +					HeaderTable.c_str(), n.c_str(), f.c_str(), +					HeaderTable.c_str(),n.c_str(), f.c_str()); +			applyKeys(boost::bind(appendKeyAnds, &sql, _1), ps); +			sql.appendf(" ORDER BY r.p2_cacheid DESC, r.p2_row"); +			SelectPtr gh(db->getReadonly().newSelectCommand(sql)); +			unsigned int offset = 0; +			gh->bindParamT(offset++, time(NULL) - CacheLife); +			applyKeys(boost::bind(bindKeyValues, gh.get(), &offset, _2), ps); +			if (gh->fetch()) { +				return new SqlCacheRowSet(gh); +			} +			return NULL; +		} + +		class SqlCachePresenter : public Presenter { +			public: +				SqlCachePresenter(const Glib::ustring & name, const Glib::ustring & filter, const RdbmsDataSource * d) : +					depth(0), +					row(1), +					db(d), +					n(name), +					f(filter) { +				} +				void declareNamespace(const Glib::ustring &, const Glib::ustring &) const { } +				void setNamespace(const Glib::ustring &, const Glib::ustring &) const { } +				void pushSub(const Glib::ustring & name, const Glib::ustring &) const { +					depth += 1; +					if (depth == 2) { +						col = name; +					} +					else if (depth == 1) { +					} +				} +				void addAttr(const Glib::ustring & name, const Glib::ustring &, const VariableType & value) const { +					attrs.insert(Values::value_type(name, value)); +				} +				void addText(const VariableType & value) const { +					cols.insert(Values::value_type(col, value)); +				} +				void popSub() const { +					if (depth == 2) { +						col.clear(); +					} +					else if (depth == 1) { +						Buffer sql; +						sql.appendf("INSERT INTO %s_%s_%s_rows(p2_row", HeaderTable.c_str(), n.c_str(), f.c_str()); +						BOOST_FOREACH(const Values::value_type & a, attrs) { +							sql.appendf(", p2attr_%s", a.first.c_str()); +						} +						BOOST_FOREACH(const Values::value_type & v, cols) { +							sql.appendf(", %s", v.first.c_str()); +						} +						sql.appendf(") VALUES(?"); +						for (size_t x = attrs.size(); x > 0; x -= 1) { +							sql.append(", ?"); +						} +						for (size_t x = cols.size(); x > 0; x -= 1) { +							sql.append(", ?"); +						} +						sql.appendf(")"); +						ModifyPtr m(db->getReadonly().newModifyCommand(sql)); +						unsigned int offset = 0; +						m->bindParamI(offset++, row++); +						BOOST_FOREACH(const Values::value_type & a, attrs) { +							boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), a.second); +						} +						BOOST_FOREACH(const Values::value_type & v, cols) { +							boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), v.second); +						} +						m->execute(); +						cols.clear(); +						attrs.clear(); +					} +					depth -= 1; +				} +			private: +				mutable unsigned int depth; +				mutable unsigned int row; +				const RdbmsDataSource * db; +				mutable Glib::ustring col; +				const Glib::ustring n, f; +				typedef std::map<Glib::ustring, VariableType> Values; +				mutable Values cols, attrs; +		}; + +		PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) +		{ +			// Header +			Buffer del; +			del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str()); +			ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del)); +			h->bindParamT(0, time(NULL)); +			h->execute(); +			// Record set header +			Buffer sql; +			sql.appendf("INSERT INTO %s_%s_%s(", HeaderTable.c_str(), n.c_str(), f.c_str()); +			unsigned int offset = 0; +			applyKeys(boost::bind(appendKeyCols, &sql, &offset, _1), ps); +			sql.appendf(") VALUES("); +			offset = 0; +			applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps); +			sql.appendf(")"); +			ModifyPtr m(db->getReadonly().newModifyCommand(sql)); +			offset = 0; +			applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps); +			m->execute(); +			return new SqlCachePresenter(n, f, db); +		} + +		void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * ) +		{ +		} + +	private: +		friend class CustomSqlCacheLoader; +		const RdbmsDataSource * db; +		static std::string DataSource; +		static std::string HeaderTable; +		static time_t CacheLife; +}; + +std::string SqlCache::DataSource; +std::string SqlCache::HeaderTable; +time_t SqlCache::CacheLife; + +namespace po = boost::program_options; +class CustomSqlCacheLoader : public ElementLoaderImpl<SqlCache> { +	public: +		CustomSqlCacheLoader() : +			opts("SQL Cache options") +		{ +			opts.add_options() +				("cache.sql.datasource", po::value(&SqlCache::DataSource), +				 "The default datasource to connect to") +				("cache.sql.headertable", po::value(&SqlCache::HeaderTable)->default_value("p2cache"), +				 "The filename to store the data in") +				("cache.sql.life", po::value(&SqlCache::CacheLife)->default_value(3600), +				 "The age of cache entries after which they are removed (seconds)") +				; +		} + +		po::options_description * options() +		{ +			return &opts; +		} + +		void onIdle() +		{ +			if (!SqlCache::DataSource.empty()) { +				boost::intrusive_ptr<CommonObjects> co = new CommonObjects(); +				const RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource); +				Buffer del; +				del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str()); +				ModifyPtr m(db->getReadonly().newModifyCommand(del)); +				m->bindParamT(0, time(NULL) - SqlCache::CacheLife); +				m->execute(); +			} +		} + +	private: +		po::options_description opts; +}; +DECLARE_CUSTOM_LOADER("sqlcache", CustomSqlCacheLoader); + diff --git a/project2/sql/sqlCheck.cpp b/project2/sql/sqlCheck.cpp new file mode 100644 index 0000000..d86eb3b --- /dev/null +++ b/project2/sql/sqlCheck.cpp @@ -0,0 +1,100 @@ +#include "sqlCheck.h" +#include "xmlObjectLoader.h" +#include "selectcommand.h" +#include "column.h" +#include "rdbmsDataSource.h" +#include "commonObjects.h" +#include "sqlVariableBinder.h" +#include <boost/foreach.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> + +DECLARE_LOADER("sqlcheck", SqlCheck); + +class CantCompareNulls : public std::exception { }; + +SqlCheck::SqlCheck(const xmlpp::Element * p) : +	ParamChecker(p), +	dataSource(p, "datasource"), +	filter(p, "filter", false, ""), +	testOp(p, "testOp", false, "=="), +	testValue(p, "testValue"), +	sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())) +{ +} + +SqlCheck::~SqlCheck() +{ +} + +void +SqlCheck::loadComplete(const CommonObjects * co) +{ +	db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +class HandleDoCompare : public DB::HandleField { +	public: +		HandleDoCompare(const VariableType & tV, const std::string & tO) : +			retVal(false), +			testValue(tV), +			testOp(tO) { +		} +		void null() { +			throw CantCompareNulls(); +		} +		void string(const char *c , size_t l) { +			doTest(Glib::ustring(c, c + l)); +		} +		void integer(int64_t val) { +			doTest(val); +		} +		void floatingpoint(double val) { +			doTest(val); +		} +		void timestamp(const struct tm & val) { +			doTest(boost::posix_time::ptime_from_tm(val)); +		} +		bool operator()() const { +			return retVal; +		} +	private: +		template <typename TV> +		void doTest(const TV & val) { +			TV tv = testValue; +			if ((testOp == "==" || testOp == "=") && val == tv) { +				retVal = true; +			} +			else if (testOp == "<" && val < tv) { +				retVal = true; +			} +			else if (testOp == ">" && val > tv) { +				retVal = true; +			} +			else if (testOp == "!=" && val != tv) { +				retVal = true; +			} +			else if ((testOp == "<=" || testOp == "=<") && val <= tv) { +				retVal = true; +			} +			else if ((testOp == ">=" || testOp == "=>") && val >= tv) { +				retVal = true; +			} +		} +		bool retVal; +		const VariableType & testValue; +		std::string testOp; +}; +bool +SqlCheck::performCheck() const +{ +	boost::shared_ptr<DB::SelectCommand> query = boost::shared_ptr<DB::SelectCommand>( +			db->getWritable().newSelectCommand(sqlCommand.getSqlFor(filter()))); +	unsigned int offset = 0; +	sqlCommand.bindParams(query.get(), offset); +	HandleDoCompare h(testValue, testOp()); +	while (query->fetch()) { +		(*query)[0].apply(h); +	} +	return h(); +} + diff --git a/project2/sql/sqlCheck.h b/project2/sql/sqlCheck.h new file mode 100644 index 0000000..c9933d7 --- /dev/null +++ b/project2/sql/sqlCheck.h @@ -0,0 +1,29 @@ +#ifndef SQLCHECK_H +#define SQLCHECK_H + +#include "paramChecker.h" +#include "sqlWriter.h" + +namespace DB { class SelectCommand; } +class RdbmsDataSource; + +/// Project2 component to check the value of a variable against an RDBMS data source +class SqlCheck : public ParamChecker { +	public: +		SqlCheck(const xmlpp::Element * p); +		virtual ~SqlCheck(); + +		virtual void loadComplete(const CommonObjects *); +		bool performCheck() const; + +		const Variable dataSource; +		const Variable filter; +		const Variable testOp; +		const Variable testValue; + +	private: +		const DynamicSql::SqlCommand sqlCommand; +		const RdbmsDataSource * db; +}; + +#endif diff --git a/project2/sql/sqlHandleAsVariableType.cpp b/project2/sql/sqlHandleAsVariableType.cpp new file mode 100644 index 0000000..f084a14 --- /dev/null +++ b/project2/sql/sqlHandleAsVariableType.cpp @@ -0,0 +1,19 @@ +#include "sqlHandleAsVariableType.h" +#include <boost/date_time/posix_time/posix_time.hpp> + +void HandleAsVariableType::null() { +	variable = Null(); +} +void HandleAsVariableType::string(const char * c, size_t l) { +	variable = Glib::ustring(c, c + l); +} +void HandleAsVariableType::integer(int64_t i) { +	variable = i; +} +void HandleAsVariableType::floatingpoint(double d) { +	variable = d; +} +void HandleAsVariableType::timestamp(const struct tm & t) { +	variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t)); +} + diff --git a/project2/sql/sqlHandleAsVariableType.h b/project2/sql/sqlHandleAsVariableType.h new file mode 100644 index 0000000..c874b7c --- /dev/null +++ b/project2/sql/sqlHandleAsVariableType.h @@ -0,0 +1,18 @@ +#ifndef SQLHANDLEASVARIABLETYPE +#define SQLHANDLEASVARIABLETYPE + +#include "column.h" +#include "variables.h" + +class HandleAsVariableType : public DB::HandleField { +	public: +		void null(); +		void string(const char * c, size_t l); +		void integer(int64_t i); +		void floatingpoint(double d); +		void timestamp(const struct tm & t); +		VariableType variable; +}; + +#endif + diff --git a/project2/sql/sqlMergeTask.cpp b/project2/sql/sqlMergeTask.cpp new file mode 100644 index 0000000..09657f8 --- /dev/null +++ b/project2/sql/sqlMergeTask.cpp @@ -0,0 +1,329 @@ +#include "sqlMergeTask.h" +#include "columns.h" +#include "commonObjects.h" +#include "rdbmsDataSource.h" +#include "exceptions.h" +#include "sqlVariableBinder.h" +#include "xmlObjectLoader.h" +#include <misc.h> +#include <stdio.h> +#include <stdexcept> +#include <boost/algorithm/string/join.hpp> +#include <boost/foreach.hpp> +#include <boost/bind.hpp> +#include <libxml++/nodes/textnode.h> + +bool SqlMergeTask::defaultUseTempTable = true; +static void attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert); + +class SqlMergeInsert; +typedef boost::intrusive_ptr<SqlMergeInsert> SqlMergeInsertPtr; +/// Project2 component insert custom constructed records during an SQL Merge task +class SqlMergeInsert : IHaveParameters, public Task { +	public: +		SqlMergeInsert(const xmlpp::Element * p) : +			SourceObject(p), +			IHaveParameters(p), +			Task(p) { +		} +		void loadComplete(const CommonObjects*) { +		} +		void execute() const { +			unsigned int col = 0; +			BOOST_FOREACH(const Parameters::value_type & v, parameters) { +				boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(insert, col++), v.second); +			} +			insert->execute(); +		} +	private: +		friend void attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert); +		DB::ModifyCommand * insert; +}; + +DECLARE_LOADER("sqlmerge", SqlMergeTask); +DECLARE_LOADER("sqlmergeinsert", SqlMergeInsert); + +// Conversion logic +SqlMergeTask::SqlMergeTask(const xmlpp::Element * p) : +	SourceObject(p), +	Task(p), +	updateWhere(p, "updatewhere", false), +	patchOrder(p->get_attribute_value("patchorder")), +	earlyKeys(p->get_attribute_value("earlykeys") == "yes"), +	useView(p->get_attribute_value("useview") == "yes"), +	tempTableCreated(false), +	insCmd(NULL), +	destdb(NULL), +	dataSource(p, "datasource"), +	dtable(p->get_attribute_value("targettable")), +	dtablet(stringf("tmp_%s_%d", dtable.c_str(), getpid())) +{ +	LoaderBase loader(true); +	loader.supportedStorers.insert(Storer::into(&sources)); +	loader.collectAll(p, true); + +	if (!sources.empty() && useView) { +		throw NotSupported("useview not supported with iterate fillers"); +	} + +	BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column")) { +		xmlpp::Element * e = static_cast<xmlpp::Element *>(psi); +		TargetColumnPtr tcp(new TargetColumn(e->get_child_text()->get_content())); +		tcp->maptable = e->get_attribute_value("maptable"); +		if (!tcp->maptable.empty()) { +			if (useView) { +				throw NotSupported("useview not supported with mapped columns"); +			} +			tcp->mapcolumn = e->get_attribute_value("mapcolumn"); +		} +		cols.insert(tcp); +	} +	BOOST_FOREACH(xmlpp::Node * psi, p->find("columns/column[@key='true']")) { +		keys.insert(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content()); +	} +	BOOST_FOREACH(xmlpp::Node * psi, p->find("sql")) { +		sqls.push_back(static_cast<xmlpp::Element *>(psi)->get_child_text()->get_content()); +	} +} + +SqlMergeTask::~SqlMergeTask() +{ +	delete insCmd; +} + +void +SqlMergeTask::loadComplete(const CommonObjects * co) +{ +	destdb = &co->dataSource<RdbmsDataSource>(dataSource())->getWritable(); +	insCmd = insertCommand(); +	BOOST_FOREACH(const Sources::value_type & i, sources) { +		attach(i, insCmd); +	} +} + +SqlMergeTask::TargetColumn::TargetColumn(const Column & c) : +	column(c), +	mapcolumn(c) +{ +} + +bool +SqlMergeTask::TargetColumn::Sort::operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const +{ +	return a->column < b->column; +} + +void +SqlMergeTask::execute() const +{ +	createTempTable(); +	if (earlyKeys) { +		createTempKey(); +		copyToTempTable(); +	} +	else { +		copyToTempTable(); +		createTempKey(); +	} +	std::set<std::string> colNames; +	BOOST_FOREACH(const TargetColumnPtr & c, cols) { +		colNames.insert(c->column); +	} +	TablePatch tp(*destdb, dtablet, dtable, colNames); +	BOOST_FOREACH(const Keys::value_type & k, keys) { +		tp.addKey(k); +	} +	tp.patch(updateWhere(), patchOrder.c_str()); +	dropTempTable(); +} + +void +SqlMergeTask::createTempTable() const +{ +	if (useView) { +		DB::ModifyCommand * cv = destdb->newModifyCommand(stringf( +					"CREATE VIEW %s AS %s", +					dtablet.c_str(), +					boost::algorithm::join(sqls, " UNION ").c_str())); +		cv->execute(); +		delete cv; +	} +	else { +		DB::ModifyCommand * ctt = destdb->newModifyCommand(stringf( +					"CREATE TEMPORARY TABLE %s AS SELECT * FROM %s WHERE 0=1", +					dtablet.c_str(), +					dtable.c_str())); +		ctt->execute(); +		delete ctt; +		BOOST_FOREACH(Columns::value_type c, cols) { +			if (!c->maptable.empty()) { +				DB::ModifyCommand * at = destdb->newModifyCommand(stringf( +							"ALTER TABLE %s ADD COLUMN %s VARCHAR(1000)", +							dtablet.c_str(), +							c->mapcolumn.c_str())); +				at->execute(); +				delete at; +			} +		} +	} +	tempTableCreated = true; +} +void +SqlMergeTask::dropTempTable() const +{ +	if (tempTableCreated) { +		DB::ModifyCommand * d; +		if (useView) { +			 d = destdb->newModifyCommand("DROP VIEW " + dtablet); +		} +		else { +			 d = destdb->newModifyCommand("DROP TABLE " + dtablet); +		} +		d->execute(); +		delete d; +	} +} +void +SqlMergeTask::createTempKey() const +{ +	if (useView) return; +	/* Primary key */ +	Buffer idx; +	idx.appendf("ALTER TABLE %s ADD CONSTRAINT pk_%s PRIMARY KEY(%s)", +			dtablet.c_str(), dtablet.c_str(), +			boost::algorithm::join(keys, ", ").c_str()); +	DB::ModifyCommand * at = destdb->newModifyCommand(idx); +	at->execute(); +	delete at; +	/* Indexes */ +	int n = 0; +	BOOST_FOREACH(const Keys::value_type & i, indexes) { +		DB::ModifyCommand * ci = destdb->newModifyCommand(stringf( +					"CREATE INDEX idx_%s_%d ON %s(%s)", +					dtablet.c_str(), n, dtablet.c_str(), i.c_str())); +		ci->execute(); +		delete ci; +		n += 1; +	} +} +DB::ModifyCommand * +SqlMergeTask::insertCommand() const +{ +	Buffer ins; +	ins.appendf("INSERT INTO %s(", dtablet.c_str()); +	foreach(Columns::const_iterator, cols, c) { +		if (c != cols.begin()) { +			ins.append(", "); +		} +		ins.append((*c)->mapcolumn); +	} +	ins.append(") VALUES ("); +	foreach(Columns::const_iterator, cols, c) { +		if (c == cols.begin()) { +			ins.append("?"); +		} +		else { +			ins.append(", ?"); +		} +	} +	ins.append(")"); +	return destdb->newModifyCommand(ins); +} + +class Populate : public NoOutputExecute { +	public: +		Populate(DB::ModifyCommand * c) : +			SourceObject(__FUNCTION__), +			NoOutputExecute(__FUNCTION__), +			cmd(c) +		{ +		} +		virtual void loadComplete(const CommonObjects *) +		{ +		} +		void execute() const +		{ +			unsigned int idx = 0; +			RowState::Stack().back()->foreachColumn(boost::bind(&Populate::bind, this, boost::ref(idx), _3)); +			cmd->execute(); +		} +	private: +		void bind(unsigned int & idx, const VariableType & value) const +		{ +			boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, idx++), value); +		} +		DB::ModifyCommand * cmd; +}; +typedef boost::intrusive_ptr<Populate> PopulatePtr; + +void +attach(SqlMergeInsertPtr i, DB::ModifyCommand * insert) +{ +	if (i) { +		i->insert = insert; +	} +} + +static void +attach(boost::intrusive_ptr<IHaveSubTasks> i, DB::ModifyCommand * insert) +{ +	if (!i) { +		return; +	} +	if (i->normal.empty()) { +		i->normal.push_back(new Populate(insert)); +	} +	else { +		BOOST_FOREACH(const IHaveSubTasks::Tasks::value_type & n, i->normal) { +			attach(boost::dynamic_pointer_cast<IHaveSubTasks>(n), insert); +			attach(boost::dynamic_pointer_cast<SqlMergeInsert>(n), insert); +		} +	} +} + +void +SqlMergeTask::copyToTempTable() const +{ +	if (useView) return; +	BOOST_FOREACH(const Sources::value_type & i, sources) { +		i->execute(); +	} +	BOOST_FOREACH(const std::string & sql, sqls) { +		Buffer ins; +		ins.appendf("INSERT INTO %s(", dtablet.c_str()); +		foreach(Columns::const_iterator, cols, c) { +			if (c != cols.begin()) { +				ins.append(", "); +			} +			ins.append((*c)->column); +		} +		ins.append(") SELECT "); +		foreach(Columns::const_iterator, cols, c) { +			if (c != cols.begin()) { +				ins.append(", "); +			} +			ins.append((*c)->column); +		} +		ins.appendf(" FROM (%s) tmp_src", sql.c_str()); +		DB::ModifyCommand * cttt = destdb->newModifyCommand(ins); +		cttt->execute(); +		delete cttt; +	} +	BOOST_FOREACH(Columns::value_type c, cols) { +		if (!c->maptable.empty()) { +			DB::ModifyCommand * utt = destdb->newModifyCommand( +					stringf( +						"UPDATE %s d SET %s = (SELECT m.%s FROM %s m WHERE m.%s = d.%s) WHERE %s IS NULL", +						dtablet.c_str(), +						c->column.c_str(), +						c->column.c_str(), +						c->maptable.c_str(), +						c->mapcolumn.c_str(), +						c->mapcolumn.c_str(), +						c->column.c_str())); +			utt->execute(); +			delete utt; +		} +	} +} + diff --git a/project2/sql/sqlMergeTask.h b/project2/sql/sqlMergeTask.h new file mode 100644 index 0000000..c9e206c --- /dev/null +++ b/project2/sql/sqlMergeTask.h @@ -0,0 +1,79 @@ +#ifndef SQLMERGETASK_H +#define SQLMERGETASK_H + +#include <connection.h> +#include <modifycommand.h> +#include <selectcommand.h> +#include <buffer.h> +#include "tablepatch.h" +#include "task.h" +#include "iterate.h" +#include "variables.h" +#include <string> +#include <set> +#include <map> +#include <list> + +/// Project2 component merge arbitrary data into an RDBMS table +class SqlMergeTask : public Task { +	public: +		typedef std::string Table; +		typedef std::string Column; +		class TargetColumn; +		typedef boost::intrusive_ptr<TargetColumn> TargetColumnPtr; +		class TargetColumn : public virtual IntrusivePtrBase { +			public: +				class Sort { +					public: +						bool operator()(const TargetColumnPtr & a, const TargetColumnPtr & b) const; +				}; +				TargetColumn(const Column &); + +				Column column; +				Column mapcolumn; +				Table maptable; + +		}; +		typedef std::set<TargetColumnPtr, TargetColumn::Sort> Columns; + +		typedef std::set<Column> Keys; + +		SqlMergeTask(const xmlpp::Element * p); +		virtual ~SqlMergeTask(); + +		virtual void loadComplete(const CommonObjects *); +		void execute() const; +		Columns cols; +		Keys keys; +		Keys indexes; +		const Variable updateWhere; +		const std::string patchOrder; +		const bool earlyKeys; +		const bool useView; + +	private: +		virtual void copyToTempTable() const; +		void createTempTable() const; +		void dropTempTable() const; +		void createTempKey() const; + +		mutable bool tempTableCreated; +		typedef ANONSTORAGEOF(Iterate) Sources; +		Sources sources; +		std::list<std::string>	sqls; +	protected: +		DB::ModifyCommand * insertCommand() const; +		DB::ModifyCommand * insCmd; + +	public: +		const DB::Connection * destdb; +		const Variable dataSource; +		const Table dtable; +		const Table dtablet; + +		static unsigned int defaultVerbosity; +		static bool defaultUseTempTable; +}; + +#endif + diff --git a/project2/sql/sqlRows.cpp b/project2/sql/sqlRows.cpp new file mode 100644 index 0000000..6e506d7 --- /dev/null +++ b/project2/sql/sqlRows.cpp @@ -0,0 +1,69 @@ +#include "sqlRows.h" +#include "sqlHandleAsVariableType.h" +#include "rowProcessor.h" +#include "xml.h" +#include "selectcommand.h" +#include "rdbmsDataSource.h" +#include "column.h" +#include <string.h> +#include "xmlObjectLoader.h" +#include "commonObjects.h" +#include <boost/date_time/gregorian/gregorian_types.hpp> +#include <boost/foreach.hpp> + +DECLARE_LOADER("sqlrows", SqlRows); + +SqlRows::SqlRows(const xmlpp::Element * p) : +	RowSet(p), +	dataSource(p, "datasource"), +	sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())), +	db(NULL) +{ +} + +SqlRows::~SqlRows() +{ +} + +void +SqlRows::loadComplete(const CommonObjects * co) +{ +	db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +SqlRows::SqlState::SqlState(SelectPtr s) : +	query(s) +{ +} + +const Columns & +SqlRows::SqlState::getColumns() const +{ +	if (columns.empty()) { +		for (unsigned int c = 0; c < query->columnCount(); c++) { +			columns.insert(new Column(c, (*query)[c].name)); +		} +	} +	return columns; +} + +void +SqlRows::execute(const Glib::ustring & filter, const RowProcessor * rp) const +{ +	unsigned int offset = 0; +	SqlState ss(SelectPtr(db->getReadonly().newSelectCommand(sqlCommand.getSqlFor(filter)))); +	sqlCommand.bindParams(ss.query.get(), offset); +	while (ss.query->fetch()) { +		HandleAsVariableType h; +		if (ss.fields.empty()) { +			ss.fields.resize(ss.query->columnCount()); +		} +		for (unsigned int c = 0; c < ss.query->columnCount(); c++) { +			const DB::Column & col = (*ss.query)[c]; +			col.apply(h); +			ss.fields[c] = h.variable; +		} +		ss.process(rp); +	} +} + diff --git a/project2/sql/sqlRows.h b/project2/sql/sqlRows.h new file mode 100644 index 0000000..5614fa1 --- /dev/null +++ b/project2/sql/sqlRows.h @@ -0,0 +1,40 @@ +#ifndef SQLROWS_H +#define SQLROWS_H + +#include <libxml++/nodes/element.h> +#include <boost/intrusive_ptr.hpp> +#include <map> +#include "selectcommand.h" +#include "iHaveParameters.h" +#include "rowSet.h" +#include "sqlWriter.h" + +class RdbmsDataSource; + +/// Project2 component to create a row set based on an SQL SELECT statement issued against an RDBMS data source +class SqlRows : public RowSet { +	public: +		SqlRows(const xmlpp::Element * p); +		~SqlRows(); + +		void execute(const Glib::ustring &, const RowProcessor *) const; +		virtual void loadComplete(const CommonObjects *); + +		const Variable dataSource; + +	private: +		const DynamicSql::SqlCommand sqlCommand; +		typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; +		class SqlState : public RowState { +			public: +				SqlState(SelectPtr query); +				const Columns & getColumns() const; +				SelectPtr query; +				mutable Columns columns; +				friend class SqlRows; +		}; +		const RdbmsDataSource * db; +}; + +#endif + diff --git a/project2/sql/sqlTask.cpp b/project2/sql/sqlTask.cpp new file mode 100644 index 0000000..3a1d334 --- /dev/null +++ b/project2/sql/sqlTask.cpp @@ -0,0 +1,70 @@ +#include "sqlTask.h" +#include <boost/foreach.hpp> +#include "xmlObjectLoader.h" +#include "modifycommand.h" +#include "rdbmsDataSource.h" +#include "commonObjects.h" +#include "sqlVariableBinder.h" + +DECLARE_LOADER("sqltask", SqlTask); +StaticMessageException(RunOnNotSpecified, "runon attribute must be specified"); + +class SqlIfChangesStorer : public StorerBase<NoOutputExecute, ANONORDEREDSTORAGEOF(NoOutputExecute)> { +	public: +		SqlIfChangesStorer(Map c, Map nc) : +			changes(c), +			noChanges(nc) +		{ +		} +		bool insert(const xmlpp::Element * p, NoOutputExecutePtr O) { +			xmlpp::Attribute * runon = p->get_attribute("runon"); +			if (!runon) { +				throw	RunOnNotSpecified(); +			} +			((runon->get_value() == "changes") ? changes : noChanges)->push_back(O); +			return true; +		} +		Map changes, noChanges; +}; + +SqlTask::SqlTask(const xmlpp::Element * p) : +	SourceObject(p), +	Task(p), +	dataSource(p, "datasource"), +	filter(p, "filter", false, ""), +	sqlCommand(dynamic_cast<xmlpp::Element *>(p->get_children("sql").front())) +{ +	LoaderBase loader(true); +	loader.supportedStorers.insert(new SqlIfChangesStorer(&changesNOEs, &noChangesNOEs)); +	loader.collectAll(p, true, IgnoreUnsupported); +} + +SqlTask::~SqlTask() +{ +} + +void +SqlTask::loadComplete(const CommonObjects * co) +{ +	db = co->dataSource<RdbmsDataSource>(dataSource()); +} + +void +SqlTask::execute() const +{ +	boost::shared_ptr<DB::ModifyCommand> modify = boost::shared_ptr<DB::ModifyCommand>( +			db->getWritable().newModifyCommand(sqlCommand.getSqlFor(filter()))); +	unsigned int offset = 0; +	sqlCommand.bindParams(modify.get(), offset); +	if (modify->execute() == 0) { +		BOOST_FOREACH(const SubNOEs::value_type & sq, noChangesNOEs) { +			sq->execute(); +		} +	} +	else { +		BOOST_FOREACH(const SubNOEs::value_type & sq, changesNOEs) { +			sq->execute(); +		} +	} +} + diff --git a/project2/sql/sqlTask.h b/project2/sql/sqlTask.h new file mode 100644 index 0000000..384b000 --- /dev/null +++ b/project2/sql/sqlTask.h @@ -0,0 +1,36 @@ +#ifndef SQLTASK_H +#define SQLTASK_H + +#include <libxml++/nodes/element.h> +#include <boost/intrusive_ptr.hpp> +#include <map> +#include "task.h" +#include "variables.h" +#include "sqlWriter.h" + +namespace DB { class ModifyCommand; } +class RdbmsDataSource; + +/// Project2 component to execute a modifying SQL statement against an RDBMS data source +class SqlTask : public Task { +	public: +		SqlTask(const xmlpp::Element * p); +		virtual ~SqlTask(); +		virtual void loadComplete(const CommonObjects *); +		virtual void execute() const; + +		const Variable dataSource; +		const Variable filter; + +		typedef ANONORDEREDSTORAGEOF(NoOutputExecute) SubNOEs; +		SubNOEs changesNOEs; +		SubNOEs noChangesNOEs; + +	protected: +		const DynamicSql::SqlCommand sqlCommand; +		const RdbmsDataSource * db; +}; + +#endif + + diff --git a/project2/sql/sqlVariableBinder.cpp b/project2/sql/sqlVariableBinder.cpp new file mode 100644 index 0000000..23c24f2 --- /dev/null +++ b/project2/sql/sqlVariableBinder.cpp @@ -0,0 +1,77 @@ +#include "sqlVariableBinder.h" +#include "command.h" +#include "variables.h" +#include <boost/date_time/posix_time/conversion.hpp> + +SqlVariableBinder::SqlVariableBinder(DB::Command * c, unsigned int i) : +	cmd(c), +	idx(i) +{ +} +void +SqlVariableBinder::operator()(const Null &) const +{ +	cmd->bindNull(idx); +} +void +SqlVariableBinder::operator()(const Glib::ustring & i) const +{ +	cmd->bindParamS(idx, i); +} +void +SqlVariableBinder::operator()(const long long unsigned int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long unsigned int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const unsigned int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const short unsigned int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long long int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const long int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const short int & i) const +{ +	cmd->bindParamI(idx, i); +} +void +SqlVariableBinder::operator()(const double & i) const +{ +	cmd->bindParamF(idx, i); +} +void +SqlVariableBinder::operator()(const float & i) const +{ +	cmd->bindParamF(idx, i); +} +void +SqlVariableBinder::operator()(const boost::posix_time::ptime & i) const +{ +	struct tm tm(boost::posix_time::to_tm(i)); +	cmd->bindParamT(idx, &tm); +} + diff --git a/project2/sql/sqlVariableBinder.h b/project2/sql/sqlVariableBinder.h new file mode 100644 index 0000000..df3879a --- /dev/null +++ b/project2/sql/sqlVariableBinder.h @@ -0,0 +1,36 @@ +#ifndef SQLVARIABLEBINDER_H +#define SQLVARIABLEBINDER_H + +#include <boost/variant.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/date_time/posix_time/ptime.hpp> +#include <glibmm/ustring.h> + +namespace DB { +	class Command; +} +class Null; +class SqlVariableBinder : public boost::static_visitor<> { +	public: +		SqlVariableBinder(DB::Command * c, unsigned int i); +		void operator()(const Null & i) const; +		void operator()(const Glib::ustring & i) const; +		void operator()(const long long unsigned int & i) const; +		void operator()(const long unsigned int & i) const; +		void operator()(const unsigned int & i) const; +		void operator()(const short unsigned int & i) const; +		void operator()(const long long int & i) const; +		void operator()(const long int & i) const; +		void operator()(const int & i) const; +		void operator()(const short int & i) const; +		void operator()(const double & i) const; +		void operator()(const float & i) const; +		void operator()(const boost::posix_time::ptime & i) const; + +	private: +		DB::Command * cmd; +		unsigned int idx; +}; + +#endif + diff --git a/project2/sql/sqlWriter.cpp b/project2/sql/sqlWriter.cpp new file mode 100644 index 0000000..88d9801 --- /dev/null +++ b/project2/sql/sqlWriter.cpp @@ -0,0 +1,131 @@ +#include "sqlWriter.h" +#include <boost/foreach.hpp> +#include "sqlVariableBinder.h" + +DynamicSql::SqlWriter::SqlWriter() +{ +} + +DynamicSql::SqlWriter::~SqlWriter() +{ +} + +DynamicSql::SqlCommand::SqlCommand(const xmlpp::Element * N) +{ +	BOOST_FOREACH(xmlpp::Node * n, N->get_children()) { +		const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n); +		if (t) { +			writers.push_back(new SqlText(t)); +		} +		const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n); +		if (e) { +			if (e->get_name() == "filter") { +				SqlFilterPtr f = new SqlFilter(e); +				writers.push_back(f); +				filters.insert(Filters::value_type(f->name, f)); +			} +			else if (e->get_name() == "param") { +				writers.push_back(new SqlParameter(e)); +			} +		} +	} +} + +Glib::ustring +DynamicSql::SqlCommand::getSqlFor(const Glib::ustring & f) const +{ +	BOOST_FOREACH(const SqlCommand::Filters::value_type & filter, filters) { +		filter.second->active = (filter.second->name == f); +	} +	Glib::ustring sql; +	writeSql(sql); +	return sql; +} + +void +DynamicSql::SqlCommand::writeSql(Glib::ustring & sql) const +{ +	BOOST_FOREACH(const SqlWriterPtr & w, writers) { +		w->writeSql(sql); +	} +} + +void +DynamicSql::SqlCommand::bindParams(DB::Command * cmd, unsigned int & offset) const +{ +	BOOST_FOREACH(const SqlWriterPtr & w, writers) { +		w->bindParams(cmd, offset); +	} +} + +DynamicSql::SqlFilter::SqlFilter(const xmlpp::Element * N) : +	name(N->get_attribute_value("name")), +	active(false) +{ +	BOOST_FOREACH(xmlpp::Node * n, N->get_children()) { +		const xmlpp::TextNode * t = dynamic_cast<const xmlpp::TextNode *>(n); +		if (t) { +			writers.push_back(new SqlText(t)); +		} +		const xmlpp::Element * e = dynamic_cast<const xmlpp::Element *>(n); +		if (e) { +			if (e->get_name() == "param") { +				writers.push_back(new SqlParameter(e)); +			} +		} +	} +} + +void +DynamicSql::SqlFilter::writeSql(Glib::ustring & sql) const +{ +	if (active) { +		BOOST_FOREACH(const SqlWriterPtr & w, writers) { +			w->writeSql(sql); +		} +	} +} + +void +DynamicSql::SqlFilter::bindParams(DB::Command * cmd, unsigned int & offset) const +{ +	if (active) { +		BOOST_FOREACH(const SqlWriterPtr & w, writers) { +			w->bindParams(cmd, offset); +		} +	} +} + +DynamicSql::SqlParameter::SqlParameter(const xmlpp::Element * n) : +	Variable(n, boost::optional<Glib::ustring>("local")) +{ +} + +void +DynamicSql::SqlParameter::writeSql(Glib::ustring & sql) const +{ +	sql.append("?"); +} + +void +DynamicSql::SqlParameter::bindParams(DB::Command * cmd, unsigned int & offset) const +{ +	boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, offset++), (*this)); +} + +DynamicSql::SqlText::SqlText(const xmlpp::TextNode * n) : +	text(n->get_content()) +{ +} + +void +DynamicSql::SqlText::writeSql(Glib::ustring & sql) const +{ +	sql.append(text); +} + +void +DynamicSql::SqlText::bindParams(DB::Command *, unsigned int &) const +{ +} + diff --git a/project2/sql/sqlWriter.h b/project2/sql/sqlWriter.h new file mode 100644 index 0000000..7693e19 --- /dev/null +++ b/project2/sql/sqlWriter.h @@ -0,0 +1,63 @@ +#ifndef SQLWRITER_H +#define SQLWRITER_H + +#include <intrusivePtrBase.h> +#include <libxml++/nodes/textnode.h> +#include <command.h> +#include <glibmm/ustring.h> +#include <list> +#include <map> +#include "variables.h" + +namespace DynamicSql { +	class SqlWriter; +	typedef boost::intrusive_ptr<SqlWriter> SqlWriterPtr; +	typedef std::list<SqlWriterPtr> Writers; +	class SqlWriter : public IntrusivePtrBase { +		public: +			SqlWriter(); +			virtual ~SqlWriter(); +			virtual void writeSql(Glib::ustring & sql) const = 0; +			virtual void bindParams(DB::Command *, unsigned int & offset) const = 0; +	}; +	class SqlText : public SqlWriter { +		public: +			SqlText(const xmlpp::TextNode *); +			virtual void writeSql(Glib::ustring & sql) const; +			virtual void bindParams(DB::Command *, unsigned int & offset) const; + +			const Glib::ustring text; +	}; +	class SqlParameter : public SqlWriter, Variable { +		public: +			SqlParameter(const xmlpp::Element *); +			virtual void writeSql(Glib::ustring & sql) const; +			virtual void bindParams(DB::Command *, unsigned int & offset) const; +	}; +	class SqlFilter : public SqlWriter { +		public: +			SqlFilter(const xmlpp::Element *); +			virtual void writeSql(Glib::ustring & sql) const; +			virtual void bindParams(DB::Command *, unsigned int & offset) const; + +			const Glib::ustring name; +			bool active; +		private: +			Writers writers; +	}; +	typedef boost::intrusive_ptr<SqlFilter> SqlFilterPtr; +	class SqlCommand : public SqlWriter { +		public: +			SqlCommand(const xmlpp::Element *); +			virtual void writeSql(Glib::ustring & sql) const; +			virtual void bindParams(DB::Command *, unsigned int & offset) const; +			typedef std::multimap<Glib::ustring, SqlFilterPtr> Filters; +			Glib::ustring getSqlFor(const Glib::ustring & f) const; +		private: +			Filters filters; +			Writers writers; +	}; +} + +#endif + diff --git a/project2/sql/tablepatch.cpp b/project2/sql/tablepatch.cpp new file mode 100644 index 0000000..f87f60e --- /dev/null +++ b/project2/sql/tablepatch.cpp @@ -0,0 +1,438 @@ +#include "tablepatch.h" +#include <stdio.h> +#include <misc.h> +#include <selectcommand.h> +#include <column.h> +#include <buffer.h> +#include <boost/algorithm/string/join.hpp> + +using namespace DB; + +TablePatch::TablePatch(const Connection & wdb, const TablePatch::Table & s, const TablePatch::Table & d, +		const TablePatch::Columns & c) : +    src(s), +    dest(d), +	cols(c), +    db(wdb) +{ +    if (!src.length()) { +        throw PatchCheckFailure(); +    } +    if (!dest.length()) { +        throw PatchCheckFailure(); +    } +    if (!db.inTx()) { +        throw PatchCheckFailure(); +    } +} + +void +TablePatch::addKey(const TablePatch::Column & c) +{ +    pk.insert(c); +} + +void +TablePatch::patch(const char * where, const char * order) +{ +    if (pk.size() == 0) { +        throw PatchCheckFailure(); +    } +    doDeletes(where, order); +    doUpdates(where, order); +    doInserts(order); +} + +void +TablePatch::doDeletes(const char * where, const char * order) +{ +	switch (db.bulkDeleteStyle()) { +		case BulkDeleteUsingSubSelect: +			{ +				// ----------------------------------------------------------------- +				// Build SQL to delete keys ---------------------------------------- +				// ----------------------------------------------------------------- +				Buffer toDelSql; +				toDelSql.appendf("DELETE FROM %s WHERE (", +						dest.c_str()); +				foreach (PKI, pk, pki) { +					if (pki != pk.begin()) { +						toDelSql.append(", "); +					} +					toDelSql.appendf("%s.%s", +							dest.c_str(), +							pki->c_str()); +				} +				// ----------------------------------------------------------------- +				// Build SQL to select keys to delete ------------------------------ +				// ----------------------------------------------------------------- +				toDelSql.append(") IN (SELECT "); +				foreach (PKI, pk, pki) { +					if (pki != pk.begin()) { +						toDelSql.append(", "); +					} +					toDelSql.appendf("a.%s", +							pki->c_str()); +				} +				toDelSql.appendf(" FROM %s a LEFT OUTER JOIN %s b ON ", +						dest.c_str(), src.c_str()); +				foreach (PKI, pk, pki) { +					if (pki != pk.begin()) { +						toDelSql.append(" AND "); +					} +					toDelSql.appendf(" a.%s = b.%s", +							pki->c_str(), pki->c_str()); +				} +				foreach (PKI, pk, pki) { +					if (pki == pk.begin()) { +						toDelSql.append(" WHERE "); +					} +					else { +						toDelSql.append(" AND "); +					} +					toDelSql.appendf(" b.%s IS NULL", +							pki->c_str()); +				} +				if (where && *where) { +					toDelSql.appendf(" AND %s", where); +				} +				if (order && *order) { +					toDelSql.appendf(" ORDER BY %s", order); +				} +				toDelSql.append(")"); +				ModifyCommand * del = db.newModifyCommand(toDelSql); +				del->execute(); +				delete del; +				break; +			} +		case BulkDeleteUsingUsingAlias: +		case BulkDeleteUsingUsing: +			{ +				Buffer toDelSql; +				toDelSql.appendf("DELETE FROM %s USING %s a LEFT OUTER JOIN %s b ", +						(db.bulkDeleteStyle() == BulkDeleteUsingUsingAlias ? "a" : dest.c_str()), +						dest.c_str(), src.c_str()); +				foreach (PKI, pk, pki) { +					if (pki != pk.begin()) { +						toDelSql.append(" AND "); +					} +					else { +						toDelSql.append(" ON "); +					} +					toDelSql.appendf(" a.%s = b.%s", +							pki->c_str(), pki->c_str()); +				} +				foreach (PKI, pk, pki) { +					if (pki != pk.begin()) { +						toDelSql.append(" AND "); +					} +					else { +						toDelSql.append(" WHERE "); +					} +					toDelSql.appendf(" b.%s IS NULL", +							pki->c_str()); +				} +				if (where && *where) { +					toDelSql.appendf(" AND %s", where); +				} +				if (order && *order) { +					toDelSql.appendf(" ORDER BY %s", order); +				} +				ModifyCommand * del = db.newModifyCommand(toDelSql); +				del->execute(); +				delete del; +				break; +			} +	} +} + +void +TablePatch::doUpdates(const char * where, const char * order) +{ +	if (cols.size() == pk.size()) { +		// Can't "change" anything... it's all part of the key +		return; +	} +	switch (db.bulkUpdateStyle()) { +		case BulkUpdateByIteration: +			{ +				// ----------------------------------------------------------------- +				// Build SQL for list of updates to perform ------------------------ +				// ----------------------------------------------------------------- +				Buffer toUpdSel; +				toUpdSel.append("SELECT "); +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						toUpdSel.appendf("b.%s, ", +								col->c_str()); +					} +				} +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) != pk.end()) { +						toUpdSel.appendf("b.%s, ", +								col->c_str()); +					} +				} +				toUpdSel.appendf("0 FROM %s a, %s b", +						dest.c_str(), src.c_str()); +				foreach (PKI, pk, pki) { +					if (pki == pk.begin()) { +						toUpdSel.append(" WHERE "); +					} +					else { +						toUpdSel.append(" AND "); +					} +					toUpdSel.appendf(" a.%s = b.%s", +							pki->c_str(), pki->c_str()); +				} +				if (where && *where) { +					toUpdSel.appendf(" AND %s", where); +				} +				toUpdSel.append(" AND ("); +				bool first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							toUpdSel.append(" OR "); +						} +						first = false; +						toUpdSel.appendf( +								" (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ +							+ (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", +								col->c_str(), col->c_str(), col->c_str(), col->c_str()); +					} +				} +				toUpdSel.append(")"); +				if (order && *order) { +					toUpdSel.appendf(" ORDER BY %s", order); +				} +				// ----------------------------------------------------------------- +				// Build SQL to perform updates ------------------------------------ +				// ----------------------------------------------------------------- +				Buffer updSql; +				updSql.appendf("UPDATE %s SET ", +						dest.c_str()); +				first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							updSql.append(", "); +						} +						first = false; +						updSql.appendf(" %s = ?", +								col->c_str()); +					} +				} +				foreach (PKI, pk, pki) { +					if (pki == pk.begin()) { +						updSql.append(" WHERE "); +					} +					else { +						updSql.append(" AND "); +					} +					updSql.appendf(" %s = ?", +							pki->c_str()); +				} +				// ----------------------------------------------------------------- +				// Iterator over update list make changes -------------------------- +				// ----------------------------------------------------------------- +				SelectCommand * toUpd = db.newSelectCommand(toUpdSel); +				ModifyCommand * upd = db.newModifyCommand(updSql); +				int cs = cols.size(); +				toUpd->execute(); +				for (int c = 0; c < cs; c += 1) { +					(*toUpd)[c].rebind(upd, c); +				} +				while (toUpd->fetch()) { +					upd->execute(false); +				} +				delete toUpd; +				delete upd; +			} +			break; +		case BulkUpdateUsingFromSrc: +			{ +				// ----------------------------------------------------------------- +				// Build SQL for list of updates to perform ------------------------ +				// ----------------------------------------------------------------- +				Buffer updSql; +				updSql.appendf("UPDATE %s a SET ", +						dest.c_str()); +				bool first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							updSql.append(", "); +						} +						first = false; +						updSql.appendf(" %s = b.%s ", +								col->c_str(), col->c_str()); +					} +				} +				updSql.appendf(" FROM %s b ", +						src.c_str()); +				foreach (PKI, pk, pki) { +					if (pki == pk.begin()) { +						updSql.append(" WHERE "); +					} +					else { +						updSql.append(" AND "); +					} +					updSql.appendf(" a.%s = b.%s ", +							pki->c_str(), pki->c_str()); +				} +				updSql.append(" AND ("); +				first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							updSql.append("  OR "); +						} +						first = false; +						updSql.appendf( +								" (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ +							+ (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", +								col->c_str(), col->c_str(), +								col->c_str(), col->c_str()); +					} +				} +				updSql.append(")"); +				if (where && *where) { +					updSql.appendf(" AND %s ", where); +				} +				if (order && *order) { +					updSql.appendf(" ORDER BY %s", order); +				} +				// ----------------------------------------------------------------- +				// Execute the bulk update command --------------------------------- +				// ----------------------------------------------------------------- +				ModifyCommand * upd = db.newModifyCommand(updSql); +				upd->execute(true); +				delete upd; +				break; +			} +		case BulkUpdateUsingJoin: +			{ +				// ----------------------------------------------------------------- +				// Build SQL for list of updates to perform ------------------------ +				// ----------------------------------------------------------------- +				Buffer updSql; +				updSql.appendf("UPDATE %s a, %s b SET ", +						dest.c_str(), src.c_str()); +				bool first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							updSql.append(", "); +						} +						first = false; +						updSql.appendf(" a.%s = b.%s ", +								col->c_str(), col->c_str()); +					} +				} +				foreach (PKI, pk, pki) { +					if (pki == pk.begin()) { +						updSql.append(" WHERE "); +					} +					else { +						updSql.append(" AND "); +					} +					updSql.appendf(" a.%s = b.%s ", +							pki->c_str(), pki->c_str()); +				} +				updSql.append(" AND ("); +				first = true; +				foreach (Columns::const_iterator, cols, col) { +					if (pk.find(*col) == pk.end()) { +						if (!first) { +							updSql.append("  OR "); +						} +						first = false; +						updSql.appendf( +								" (((CASE WHEN (a.%s IS NULL AND b.%s IS NULL) THEN 1 ELSE 0 END) \ +							+ (CASE WHEN(a.%s = b.%s) THEN 1 ELSE 0 END)) = 0)", +								col->c_str(), col->c_str(), +								col->c_str(), col->c_str()); +					} +				} +				updSql.append(")"); +				if (where && *where) { +					updSql.appendf(" AND %s ", where); +				} +				if (order && *order) { +					updSql.appendf(" ORDER BY %s", order); +				} +				// ----------------------------------------------------------------- +				// Execute the bulk update command --------------------------------- +				// ----------------------------------------------------------------- +				ModifyCommand * upd = db.newModifyCommand(updSql); +				upd->execute(true); +				delete upd; +				break; +			} +	} +} + +void +TablePatch::doInserts(const char * order) +{ +    // ----------------------------------------------------------------- +    // Build SQL for copying new records ------------------------------- +    // ----------------------------------------------------------------- +	Buffer toInsSql; +    toInsSql.appendf("INSERT INTO %s", +            dest.c_str()); +    foreach (Columns::const_iterator, cols, col) { +        if (col == cols.begin()) { +            toInsSql.append("("); +        } +        else { +            toInsSql.append(", "); +        } +        toInsSql.appendf("%s", +                col->c_str()); +    } +    toInsSql.append(") SELECT "); +    foreach (Columns::const_iterator, cols, col) { +        if (col != cols.begin()) { +            toInsSql.append(", "); +        } +        toInsSql.appendf("b.%s", +                col->c_str()); +    } +    toInsSql.appendf(" FROM %s b LEFT OUTER JOIN %s a", +            src.c_str(), dest.c_str()); +    foreach (PKI, pk, pki) { +        if (pki == pk.begin()) { +            toInsSql.append(" ON "); +        } +        else { +            toInsSql.append(" AND "); +        } +        toInsSql.appendf(" a.%s = b.%s", +                pki->c_str(), pki->c_str()); +    } +    foreach (PKI, pk, pki) { +        if (pki == pk.begin()) { +            toInsSql.append(" WHERE "); +        } +        else { +            toInsSql.append(" AND "); +        } +        toInsSql.appendf(" a.%s IS NULL", +                pki->c_str()); +    } +    if (order && *order) { +        toInsSql.appendf(" ORDER BY %s", order); +    } +	ModifyCommand * ins = db.newModifyCommand(toInsSql); +	ins->execute(); +	delete ins; +} + +const char * +TablePatch::PatchCheckFailure::what() const throw() +{ +    return "Santiy checks failed: check table names and keys"; +} + diff --git a/project2/sql/tablepatch.h b/project2/sql/tablepatch.h new file mode 100644 index 0000000..0174294 --- /dev/null +++ b/project2/sql/tablepatch.h @@ -0,0 +1,42 @@ +#ifndef TABLEPATCH_H +#define TABLEPATCH_H + +#include <string> +#include <set> +#include <map> +#include <connection.h> +#include <modifycommand.h> +#include <selectcommand.h> + +class TablePatch { +    public: +        typedef std::string Table; +        typedef std::string Column; +        typedef std::set<Column> Columns; +        typedef Columns PrimaryKey; +        typedef PrimaryKey::const_iterator PKI; + +        class PatchCheckFailure : public std::exception { +            public: +                const char * what() const throw(); +        }; + +        TablePatch(const DB::Connection & db, const Table & src, const Table & dest, const Columns & cols); + +        void            addKey(const Column & col); +        void            patch(const char * where, const char * order); + +    private: +        void            doDeletes(const char * where, const char * order); +        void            doUpdates(const char * where, const char * order); +        void            doInserts(const char * order); + +        Table           src; +        Table           dest; +        PrimaryKey      pk; +        Columns 		cols; +        const DB::Connection &db; +}; + +#endif + | 
