From 97d616057fac75771d1dab762e01d7f0ba2b9d66 Mon Sep 17 00:00:00 2001 From: randomdan Date: Mon, 29 Aug 2011 11:32:10 +0000 Subject: Adds RDBMS table caching solution --- project2/Jamfile.jam | 3 +- project2/rowProcessor.cpp | 5 +- project2/sqlCache.cpp | 300 +++++++++++++++++++++++++++++++++++ project2/sqlHandleAsVariableType.cpp | 19 +++ project2/sqlHandleAsVariableType.h | 18 +++ project2/sqlRows.cpp | 22 +-- 6 files changed, 344 insertions(+), 23 deletions(-) create mode 100644 project2/sqlCache.cpp create mode 100644 project2/sqlHandleAsVariableType.cpp create mode 100644 project2/sqlHandleAsVariableType.h diff --git a/project2/Jamfile.jam b/project2/Jamfile.jam index e5b09da..96acca0 100644 --- a/project2/Jamfile.jam +++ b/project2/Jamfile.jam @@ -148,7 +148,8 @@ obj sql-modPQ : ; lib p2sql : - sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp + sqlCheck.cpp sqlWriter.cpp sqlTask.cpp sqlMergeTask.cpp sqlRows.cpp sqlCache.cpp sqlVariableBinder.cpp tablepatch.cpp rdbmsDataSource.cpp + sqlHandleAsVariableType.cpp ../libdbpp//dbpp : yes:sql-modODBC diff --git a/project2/rowProcessor.cpp b/project2/rowProcessor.cpp index efd188c..4ea89fd 100644 --- a/project2/rowProcessor.cpp +++ b/project2/rowProcessor.cpp @@ -31,7 +31,10 @@ RowProcessor::execute() const } } BOOST_FOREACH(const CachePtr & c, caches) { - tc.insert(c->openFor(source->name, filter, this)); + PresenterPtr p = c->openFor(source->name, filter, this); + if (p) { + tc.insert(p); + } } source->execute(filter, this); tc.clear(); diff --git a/project2/sqlCache.cpp b/project2/sqlCache.cpp new file mode 100644 index 0000000..13bc23d --- /dev/null +++ b/project2/sqlCache.cpp @@ -0,0 +1,300 @@ +#include "cache.h" +#include "sqlVariableBinder.h" +#include "sqlHandleAsVariableType.h" +#include "buffer.h" +#include "selectcommand.h" +#include "modifycommand.h" +#include "column.h" +#include "commonObjects.h" +#include "rdbmsDataSource.h" +#include "logger.h" +#include "xmlObjectLoader.h" +#include "iHaveParameters.h" +#include "rowSet.h" +#include +#include +#include + +typedef boost::shared_ptr SelectPtr; +typedef boost::shared_ptr ModifyPtr; + +class SqlCache : public Cache { + public: + SqlCache(const xmlpp::Element * p) : + Cache(p) + { + } + + void loadComplete(const CommonObjects * co) + { + db = co->dataSource(DataSource); + } + + static void appendKeyCols(Buffer * sql, unsigned int * off, const Glib::ustring & col) + { + if ((*off)++) { + sql->append(", "); + } + sql->append(col.c_str()); + } + + static void appendKeyBinds(Buffer * sql, unsigned int * off) + { + if ((*off)++) { + sql->append(", "); + } + sql->append("?"); + } + + static void appendKeyAnds(Buffer * sql, const Glib::ustring & col) + { + sql->appendf(" AND h.%s = ?", col.c_str()); + } + + static void bindKeyValues(DB::Command * cmd, unsigned int * offset, const VariableType & v) + { + boost::apply_visitor(SqlVariableBinder(cmd, (*offset)++), v); + } + + class SqlCacheRowSet : public RowSet { + public: + SqlCacheRowSet(SelectPtr r) : + RowSet(NULL), + s(r) { + } + void loadComplete(const CommonObjects *) { + } + class SqlCacheRowState : public RowState { + public: + SqlCacheRowState(const SqlCacheRowSet * s) : + sc(s) { + } + const Columns & getColumns() const { + return columns; + } + RowAttribute resolveAttr(const Glib::ustring & attrName) const { + return boost::bind(&SqlCacheRowState::getAttrCol, this, "p2attr_" + attrName); + } + private: + VariableType getAttrCol(const Glib::ustring & col) const { + HandleAsVariableType h; + (*sc->s)[col].apply(h); + return h.variable; + } + mutable Columns columns; + friend class SqlCacheRowSet; + const SqlCacheRowSet * sc; + }; + void execute(const Glib::ustring&, const RowProcessor * rp) const { + SqlCacheRowState ss(this); + HandleAsVariableType h; + do { + if (!(*s)["p2_cacheid"].isNull()) { + if (colCols.empty()) { + (*s)["p2_cacheid"].apply(h); + cacheId = h.variable; + unsigned int colNo = 0; + for (unsigned int c = 0; c < s->columnCount(); c++) { + const DB::Column & col = (*s)[c]; + if (!boost::algorithm::starts_with(col.name, "p2attr_") && + !boost::algorithm::starts_with(col.name, "p2_")) { + ss.columns.insert(new Column(colNo++, col.name)); + colCols.push_back(c); + } + } + ss.fields.resize(colCols.size()); + } + else { + (*s)["p2_cacheid"].apply(h); + if (cacheId != (int64_t)h.variable) { + break; + } + } + BOOST_FOREACH(const unsigned int & c, colCols) { + const DB::Column & col = (*s)[c]; + col.apply(h); + ss.fields[&c - &colCols.front()] = h.variable; + } + ss.process(rp); + } + } while (s->fetch()); + } + private: + SelectPtr s; + mutable std::vector colCols; + mutable int64_t cacheId; + }; + + RowSetCPtr getCachedRowSet(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const + { + Buffer sql; + sql.appendf("SELECT r.* \ + FROM %s p, %s_%s_%s h LEFT OUTER JOIN %s_%s_%s_rows r \ + ON h.p2_cacheid = r.p2_cacheid \ + WHERE p.p2_time > ? \ + AND p.p2_cacheid = h.p2_cacheid", + HeaderTable.c_str(), + HeaderTable.c_str(), n.c_str(), f.c_str(), + HeaderTable.c_str(),n.c_str(), f.c_str()); + applyKeys(boost::bind(appendKeyAnds, &sql, _1), ps); + sql.appendf(" ORDER BY r.p2_cacheid DESC, r.p2_row"); + SelectPtr gh(db->getReadonly().newSelectCommand(sql)); + unsigned int offset = 0; + gh->bindParamT(offset++, time(NULL) - CacheLife); + applyKeys(boost::bind(bindKeyValues, gh.get(), &offset, _2), ps); + if (gh->fetch()) { + return new SqlCacheRowSet(gh); + } + return NULL; + } + + class SqlCachePresenter : public Presenter { + public: + SqlCachePresenter(const Glib::ustring & name, const Glib::ustring & filter, const RdbmsDataSource * d) : + depth(0), + row(1), + db(d), + n(name), + f(filter) { + } + void declareNamespace(const Glib::ustring &, const Glib::ustring &) const { } + void setNamespace(const Glib::ustring &, const Glib::ustring &) const { } + void pushSub(const Glib::ustring & name, const Glib::ustring &) const { + depth += 1; + if (depth == 2) { + col = name; + } + else if (depth == 1) { + } + } + void addAttr(const Glib::ustring & name, const Glib::ustring &, const VariableType & value) const { + attrs.insert(Values::value_type(name, value)); + } + void addText(const VariableType & value) const { + cols.insert(Values::value_type(col, value)); + } + void popSub() const { + if (depth == 2) { + col.clear(); + } + else if (depth == 1) { + Buffer sql; + sql.appendf("INSERT INTO %s_%s_%s_rows(p2_row", HeaderTable.c_str(), n.c_str(), f.c_str()); + BOOST_FOREACH(const Values::value_type & a, attrs) { + sql.appendf(", p2attr_%s", a.first.c_str()); + } + BOOST_FOREACH(const Values::value_type & v, cols) { + sql.appendf(", %s", v.first.c_str()); + } + sql.appendf(") VALUES(?"); + for (size_t x = attrs.size(); x > 0; x -= 1) { + sql.append(", ?"); + } + for (size_t x = cols.size(); x > 0; x -= 1) { + sql.append(", ?"); + } + sql.appendf(")"); + ModifyPtr m(db->getReadonly().newModifyCommand(sql)); + unsigned int offset = 0; + m->bindParamI(offset++, row++); + BOOST_FOREACH(const Values::value_type & a, attrs) { + boost::apply_visitor(SqlVariableBinder(m.get(), offset++), a.second); + } + BOOST_FOREACH(const Values::value_type & v, cols) { + boost::apply_visitor(SqlVariableBinder(m.get(), offset++), v.second); + } + m->execute(); + cols.clear(); + attrs.clear(); + } + depth -= 1; + } + private: + mutable unsigned int depth; + mutable unsigned int row; + const RdbmsDataSource * db; + mutable Glib::ustring col; + const Glib::ustring n, f; + typedef std::map Values; + mutable Values cols, attrs; + }; + + PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) + { + // Header + Buffer del; + del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str()); + ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del)); + h->bindParamT(0, time(NULL)); + h->execute(); + // Record set header + Buffer sql; + sql.appendf("INSERT INTO %s_%s_%s(", HeaderTable.c_str(), n.c_str(), f.c_str()); + unsigned int offset = 0; + applyKeys(boost::bind(appendKeyCols, &sql, &offset, _1), ps); + sql.appendf(") VALUES("); + offset = 0; + applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps); + sql.appendf(")"); + ModifyPtr m(db->getReadonly().newModifyCommand(sql)); + offset = 0; + applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps); + m->execute(); + return new SqlCachePresenter(n, f, db); + } + + void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * ) + { + } + + private: + friend class CustomSqlCacheLoader; + const RdbmsDataSource * db; + static std::string DataSource; + static std::string HeaderTable; + static time_t CacheLife; +}; + +std::string SqlCache::DataSource; +std::string SqlCache::HeaderTable; +time_t SqlCache::CacheLife; + +namespace po = boost::program_options; +class CustomSqlCacheLoader : public ElementLoaderImpl { + public: + CustomSqlCacheLoader() : + opts("SQL Cache options") + { + opts.add_options() + ("cache.sql.datasource", po::value(&SqlCache::DataSource), + "The default datasource to connect to") + ("cache.sql.headertable", po::value(&SqlCache::HeaderTable)->default_value("p2cache"), + "The filename to store the data in") + ("cache.sql.life", po::value(&SqlCache::CacheLife)->default_value(3600), + "The age of cache entries after which they are removed (seconds)") + ; + } + + po::options_description * options() + { + return &opts; + } + + void onIdle() + { + if (!SqlCache::DataSource.empty()) { + boost::intrusive_ptr co = new CommonObjects(); + const RdbmsDataSource * db = co->dataSource(SqlCache::DataSource); + Buffer del; + del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str()); + ModifyPtr m(db->getReadonly().newModifyCommand(del)); + m->bindParamT(0, time(NULL) - SqlCache::CacheLife); + m->execute(); + } + } + + private: + po::options_description opts; +}; +DECLARE_CUSTOM_LOADER("sqlcache", CustomSqlCacheLoader); + diff --git a/project2/sqlHandleAsVariableType.cpp b/project2/sqlHandleAsVariableType.cpp new file mode 100644 index 0000000..f084a14 --- /dev/null +++ b/project2/sqlHandleAsVariableType.cpp @@ -0,0 +1,19 @@ +#include "sqlHandleAsVariableType.h" +#include + +void HandleAsVariableType::null() { + variable = Null(); +} +void HandleAsVariableType::string(const char * c, size_t l) { + variable = Glib::ustring(c, c + l); +} +void HandleAsVariableType::integer(int64_t i) { + variable = i; +} +void HandleAsVariableType::floatingpoint(double d) { + variable = d; +} +void HandleAsVariableType::timestamp(const struct tm & t) { + variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t)); +} + diff --git a/project2/sqlHandleAsVariableType.h b/project2/sqlHandleAsVariableType.h new file mode 100644 index 0000000..c874b7c --- /dev/null +++ b/project2/sqlHandleAsVariableType.h @@ -0,0 +1,18 @@ +#ifndef SQLHANDLEASVARIABLETYPE +#define SQLHANDLEASVARIABLETYPE + +#include "column.h" +#include "variables.h" + +class HandleAsVariableType : public DB::HandleField { + public: + void null(); + void string(const char * c, size_t l); + void integer(int64_t i); + void floatingpoint(double d); + void timestamp(const struct tm & t); + VariableType variable; +}; + +#endif + diff --git a/project2/sqlRows.cpp b/project2/sqlRows.cpp index 31d86dd..6e506d7 100644 --- a/project2/sqlRows.cpp +++ b/project2/sqlRows.cpp @@ -1,4 +1,5 @@ #include "sqlRows.h" +#include "sqlHandleAsVariableType.h" #include "rowProcessor.h" #include "xml.h" #include "selectcommand.h" @@ -8,7 +9,6 @@ #include "xmlObjectLoader.h" #include "commonObjects.h" #include -#include #include DECLARE_LOADER("sqlrows", SqlRows); @@ -31,26 +31,6 @@ SqlRows::loadComplete(const CommonObjects * co) db = co->dataSource(dataSource()); } -class HandleAsVariableType : public DB::HandleField { - public: - void null() { - variable = Null(); - } - void string(const char * c, size_t l) { - variable = Glib::ustring(c, c + l); - } - void integer(int64_t i) { - variable = i; - } - void floatingpoint(double d) { - variable = d; - } - void timestamp(const struct tm & t) { - variable = boost::posix_time::ptime(boost::posix_time::ptime_from_tm(t)); - } - VariableType variable; -}; - SqlRows::SqlState::SqlState(SelectPtr s) : query(s) { -- cgit v1.2.3