From 97d616057fac75771d1dab762e01d7f0ba2b9d66 Mon Sep 17 00:00:00 2001
From: randomdan <randomdan@localhost>
Date: Mon, 29 Aug 2011 11:32:10 +0000
Subject: Adds RDBMS table caching solution

---
 project2/Jamfile.jam                 |   3 +-
 project2/rowProcessor.cpp            |   5 +-
 project2/sqlCache.cpp                | 300 +++++++++++++++++++++++++++++++++++
 project2/sqlHandleAsVariableType.cpp |  19 +++
 project2/sqlHandleAsVariableType.h   |  18 +++
 project2/sqlRows.cpp                 |  22 +--
 6 files changed, 344 insertions(+), 23 deletions(-)
 create mode 100644 project2/sqlCache.cpp
 create mode 100644 project2/sqlHandleAsVariableType.cpp
 create mode 100644 project2/sqlHandleAsVariableType.h

diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam
index e5b09da..96acca0 100644
--- a/project2/Jamfile.jam
+++ b/project2/Jamfile.jam
@@ -148,7 +148,8 @@ obj sql-modPQ :
 	;
 	
 lib p2sql :
-	sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp
+	sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp
+	sqlHandleAsVariableType.cpp
 	../libdbpp//dbpp
 	:
 	<odbc>yes:<library>sql-modODBC
diff --git a/project2/rowProcessor.cpp b/project2/rowProcessor.cpp
index efd188c..4ea89fd 100644
--- a/project2/rowProcessor.cpp
+++ b/project2/rowProcessor.cpp
@@ -31,7 +31,10 @@ RowProcessor::execute() const
 		}
 	}
 	BOOST_FOREACH(const CachePtr & c, caches) {
-		tc.insert(c->openFor(source->name, filter, this));
+		PresenterPtr p = c->openFor(source->name, filter, this);
+		if (p) {
+			tc.insert(p);
+		}
 	}
 	source->execute(filter, this);
 	tc.clear();
diff --git a/project2/sqlCache.cpp b/project2/sqlCache.cpp
new file mode 100644
index 0000000..13bc23d
--- /dev/null
+++ b/project2/sqlCache.cpp
@@ -0,0 +1,300 @@
+#include "cache.h"
+#include "sqlVariableBinder.h"
+#include "sqlHandleAsVariableType.h"
+#include "buffer.h"
+#include "selectcommand.h"
+#include "modifycommand.h"
+#include "column.h"
+#include "commonObjects.h"
+#include "rdbmsDataSource.h"
+#include "logger.h"
+#include "xmlObjectLoader.h"
+#include "iHaveParameters.h"
+#include "rowSet.h"
+#include <boost/foreach.hpp>
+#include <boost/program_options.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+
+typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr;
+
+class SqlCache : public Cache {
+	public:
+		SqlCache(const xmlpp::Element * p) :
+			Cache(p)
+		{
+		}
+
+		void loadComplete(const CommonObjects * co)
+		{
+			db = co->dataSource<RdbmsDataSource>(DataSource);
+		}
+
+		static void appendKeyCols(Buffer * sql, unsigned int * off, const Glib::ustring & col)
+		{
+			if ((*off)++) {
+				sql->append(", ");
+			}
+			sql->append(col.c_str());
+		}
+
+		static void appendKeyBinds(Buffer * sql, unsigned int * off)
+		{
+			if ((*off)++) {
+				sql->append(", ");
+			}
+			sql->append("?");
+		}
+
+		static void appendKeyAnds(Buffer * sql, const Glib::ustring & col)
+		{
+			sql->appendf(" AND h.%s = ?", col.c_str());
+		}
+
+		static void bindKeyValues(DB::Command * cmd, unsigned int * offset, const VariableType & v)
+		{
+			boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, (*offset)++), v);
+		}
+
+		class SqlCacheRowSet : public RowSet {
+			public:
+				SqlCacheRowSet(SelectPtr r) :
+					RowSet(NULL),
+					s(r) {
+					}
+				void loadComplete(const CommonObjects *) {
+				}
+				class SqlCacheRowState : public RowState {
+					public:
+						SqlCacheRowState(const SqlCacheRowSet * s) :
+							sc(s) {
+							}
+						const Columns & getColumns() const {
+							return columns;
+						}
+						RowAttribute resolveAttr(const Glib::ustring & attrName) const {
+							return boost::bind(&SqlCacheRowState::getAttrCol, this, "p2attr_" + attrName);
+						}
+					private:
+						VariableType getAttrCol(const Glib::ustring & col) const {
+							HandleAsVariableType h;
+							(*sc->s)[col].apply(h);
+							return h.variable;
+						}
+						mutable Columns columns;
+						friend class SqlCacheRowSet;
+						const SqlCacheRowSet * sc;
+				};
+				void execute(const Glib::ustring&, const RowProcessor * rp) const {
+					SqlCacheRowState ss(this);
+					HandleAsVariableType h;
+					do {
+						if (!(*s)["p2_cacheid"].isNull()) {
+							if (colCols.empty()) {
+								(*s)["p2_cacheid"].apply(h);
+								cacheId = h.variable;
+								unsigned int colNo = 0;
+								for (unsigned int c = 0; c < s->columnCount(); c++) {
+									const DB::Column & col = (*s)[c];
+									if (!boost::algorithm::starts_with(col.name, "p2attr_") &&
+											!boost::algorithm::starts_with(col.name, "p2_")) {
+										ss.columns.insert(new Column(colNo++, col.name));
+										colCols.push_back(c);
+									}
+								}
+								ss.fields.resize(colCols.size());
+							}
+							else {
+								(*s)["p2_cacheid"].apply(h);
+								if (cacheId != (int64_t)h.variable) {
+									break;
+								}
+							}
+							BOOST_FOREACH(const unsigned int & c, colCols) {
+								const DB::Column & col = (*s)[c];
+								col.apply(h);
+								ss.fields[&c - &colCols.front()] = h.variable;
+							}
+							ss.process(rp);
+						}
+					} while (s->fetch());
+				}
+			private:
+				SelectPtr s;
+				mutable std::vector<unsigned int> colCols;
+				mutable int64_t cacheId;
+		};
+
+		RowSetCPtr getCachedRowSet(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const
+		{
+			Buffer sql;
+			sql.appendf("SELECT r.* \
+					FROM %s p, %s_%s_%s h LEFT OUTER JOIN %s_%s_%s_rows r \
+						ON h.p2_cacheid = r.p2_cacheid \
+					WHERE p.p2_time > ? \
+					AND p.p2_cacheid = h.p2_cacheid",
+					HeaderTable.c_str(),
+					HeaderTable.c_str(), n.c_str(), f.c_str(),
+					HeaderTable.c_str(),n.c_str(), f.c_str());
+			applyKeys(boost::bind(appendKeyAnds, &sql, _1), ps);
+			sql.appendf(" ORDER BY r.p2_cacheid DESC, r.p2_row");
+			SelectPtr gh(db->getReadonly().newSelectCommand(sql));
+			unsigned int offset = 0;
+			gh->bindParamT(offset++, time(NULL) - CacheLife);
+			applyKeys(boost::bind(bindKeyValues, gh.get(), &offset, _2), ps);
+			if (gh->fetch()) {
+				return new SqlCacheRowSet(gh);
+			}
+			return NULL;
+		}
+
+		class SqlCachePresenter : public Presenter {
+			public:
+				SqlCachePresenter(const Glib::ustring & name, const Glib::ustring & filter, const RdbmsDataSource * d) :
+					depth(0),
+					row(1),
+					db(d),
+					n(name),
+					f(filter) {
+				}
+				void declareNamespace(const Glib::ustring &, const Glib::ustring &) const { }
+				void setNamespace(const Glib::ustring &, const Glib::ustring &) const { }
+				void pushSub(const Glib::ustring & name, const Glib::ustring &) const {
+					depth += 1;
+					if (depth == 2) {
+						col = name;
+					}
+					else if (depth == 1) {
+					}
+				}
+				void addAttr(const Glib::ustring & name, const Glib::ustring &, const VariableType & value) const {
+					attrs.insert(Values::value_type(name, value));
+				}
+				void addText(const VariableType & value) const {
+					cols.insert(Values::value_type(col, value));
+				}
+				void popSub() const {
+					if (depth == 2) {
+						col.clear();
+					}
+					else if (depth == 1) {
+						Buffer sql;
+						sql.appendf("INSERT INTO %s_%s_%s_rows(p2_row", HeaderTable.c_str(), n.c_str(), f.c_str());
+						BOOST_FOREACH(const Values::value_type & a, attrs) {
+							sql.appendf(", p2attr_%s", a.first.c_str());
+						}
+						BOOST_FOREACH(const Values::value_type & v, cols) {
+							sql.appendf(", %s", v.first.c_str());
+						}
+						sql.appendf(") VALUES(?");
+						for (size_t x = attrs.size(); x > 0; x -= 1) {
+							sql.append(", ?");
+						}
+						for (size_t x = cols.size(); x > 0; x -= 1) {
+							sql.append(", ?");
+						}
+						sql.appendf(")");
+						ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+						unsigned int offset = 0;
+						m->bindParamI(offset++, row++);
+						BOOST_FOREACH(const Values::value_type & a, attrs) {
+							boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), a.second);
+						}
+						BOOST_FOREACH(const Values::value_type & v, cols) {
+							boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(m.get(), offset++), v.second);
+						}
+						m->execute();
+						cols.clear();
+						attrs.clear();
+					}
+					depth -= 1;
+				}
+			private:
+				mutable unsigned int depth;
+				mutable unsigned int row;
+				const RdbmsDataSource * db;
+				mutable Glib::ustring col;
+				const Glib::ustring n, f;
+				typedef std::map<Glib::ustring, VariableType> Values;
+				mutable Values cols, attrs;
+		};
+
+		PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps)
+		{
+			// Header
+			Buffer del;
+			del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str());
+			ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del));
+			h->bindParamT(0, time(NULL));
+			h->execute();
+			// Record set header
+			Buffer sql;
+			sql.appendf("INSERT INTO %s_%s_%s(", HeaderTable.c_str(), n.c_str(), f.c_str());
+			unsigned int offset = 0;
+			applyKeys(boost::bind(appendKeyCols, &sql, &offset, _1), ps);
+			sql.appendf(") VALUES(");
+			offset = 0;
+			applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps);
+			sql.appendf(")");
+			ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+			offset = 0;
+			applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps);
+			m->execute();
+			return new SqlCachePresenter(n, f, db);
+		}
+
+		void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * )
+		{
+		}
+
+	private:
+		friend class CustomSqlCacheLoader;
+		const RdbmsDataSource * db;
+		static std::string DataSource;
+		static std::string HeaderTable;
+		static time_t CacheLife;
+};
+
+std::string SqlCache::DataSource;
+std::string SqlCache::HeaderTable;
+time_t SqlCache::CacheLife;
+
+namespace po = boost::program_options;
+class CustomSqlCacheLoader : public ElementLoaderImpl<SqlCache> {
+	public:
+		CustomSqlCacheLoader() :
+			opts("SQL Cache options")
+		{
+			opts.add_options()
+				("cache.sql.datasource", po::value(&SqlCache::DataSource),
+				 "The default datasource to connect to")
+				("cache.sql.headertable", po::value(&SqlCache::HeaderTable)->default_value("p2cache"),
+				 "The filename to store the data in")
+				("cache.sql.life", po::value(&SqlCache::CacheLife)->default_value(3600),
+				 "The age of cache entries after which they are removed (seconds)")
+				;
+		}
+
+		po::options_description * options()
+		{
+			return &opts;
+		}
+
+		void onIdle()
+		{
+			if (!SqlCache::DataSource.empty()) {
+				boost::intrusive_ptr<CommonObjects> co = new CommonObjects();
+				const RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource);
+				Buffer del;
+				del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str());
+				ModifyPtr m(db->getReadonly().newModifyCommand(del));
+				m->bindParamT(0, time(NULL) - SqlCache::CacheLife);
+				m->execute();
+			}
+		}
+
+	private:
+		po::options_description opts;
+};
+DECLARE_CUSTOM_LOADER("sqlcache", CustomSqlCacheLoader);
+
diff --git a/project2/sqlHandleAsVariableType.cpp b/project2/sqlHandleAsVariableType.cpp
new file mode 100644
index 0000000..f084a14
--- /dev/null
+++ b/project2/sqlHandleAsVariableType.cpp
@@ -0,0 +1,19 @@
+#include "sqlHandleAsVariableType.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+void HandleAsVariableType::null() {
+	variable = Null();
+}
+void HandleAsVariableType::string(const char * c, size_t l) {
+	variable = Glib::ustring(c, c + l);
+}
+void HandleAsVariableType::integer(int64_t i) {
+	variable = i;
+}
+void HandleAsVariableType::floatingpoint(double d) {
+	variable = d;
+}
+void HandleAsVariableType::timestamp(const struct tm & t) {
+	variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t));
+}
+
diff --git a/project2/sqlHandleAsVariableType.h b/project2/sqlHandleAsVariableType.h
new file mode 100644
index 0000000..c874b7c
--- /dev/null
+++ b/project2/sqlHandleAsVariableType.h
@@ -0,0 +1,18 @@
+#ifndef SQLHANDLEASVARIABLETYPE
+#define SQLHANDLEASVARIABLETYPE
+
+#include "column.h"
+#include "variables.h"
+
+class HandleAsVariableType : public DB::HandleField {
+	public:
+		void null();
+		void string(const char * c, size_t l);
+		void integer(int64_t i);
+		void floatingpoint(double d);
+		void timestamp(const struct tm & t);
+		VariableType variable;
+};
+
+#endif
+
diff --git a/project2/sqlRows.cpp b/project2/sqlRows.cpp
index 31d86dd..6e506d7 100644
--- a/project2/sqlRows.cpp
+++ b/project2/sqlRows.cpp
@@ -1,4 +1,5 @@
 #include "sqlRows.h"
+#include "sqlHandleAsVariableType.h"
 #include "rowProcessor.h"
 #include "xml.h"
 #include "selectcommand.h"
@@ -8,7 +9,6 @@
 #include "xmlObjectLoader.h"
 #include "commonObjects.h"
 #include <boost/date_time/gregorian/gregorian_types.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/foreach.hpp>
 
 DECLARE_LOADER("sqlrows", SqlRows);
@@ -31,26 +31,6 @@ SqlRows::loadComplete(const CommonObjects * co)
 	db = co->dataSource<RdbmsDataSource>(dataSource());
 }
 
-class HandleAsVariableType : public DB::HandleField {
-	public:
-		void null() {
-			variable = Null();
-		}
-		void string(const char * c, size_t l) {
-			variable = Glib::ustring(c, c + l);
-		}
-		void integer(int64_t i) {
-			variable = i;
-		}
-		void floatingpoint(double d) {
-			variable = d;
-		}
-		void timestamp(const struct tm & t) {
-			variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t));
-		}
-		VariableType variable;
-};
-
 SqlRows::SqlState::SqlState(SelectPtr s) :
 	query(s)
 {
-- 
cgit v1.2.3