summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2011-11-16 00:53:10 +0000
committerrandomdan <randomdan@localhost>2011-11-16 00:53:10 +0000
commitde67e14297694e0e67a0cf253659da1dadd8144a (patch)
tree3637aa5ba3fd48b9fc62498e814dc44b68151d4c
parentLog errors from TaskHost (diff)
downloadproject2-de67e14297694e0e67a0cf253659da1dadd8144a.tar.bz2
project2-de67e14297694e0e67a0cf253659da1dadd8144a.tar.xz
project2-de67e14297694e0e67a0cf253659da1dadd8144a.zip
Transactional caches
-rw-r--r--project2/common/cache.h3
-rw-r--r--project2/common/commonObjects.h4
-rw-r--r--project2/common/rowProcessor.cpp42
-rw-r--r--project2/common/rowProcessor.h5
-rw-r--r--project2/common/taskHost.cpp11
-rw-r--r--project2/common/viewHost.cpp4
-rw-r--r--project2/sql/sqlCache.cpp29
-rw-r--r--project2/xml/xmlCache.cpp6
8 files changed, 77 insertions, 27 deletions
diff --git a/project2/common/cache.h b/project2/common/cache.h
index 899a01e..e642f5f 100644
--- a/project2/common/cache.h
+++ b/project2/common/cache.h
@@ -16,7 +16,8 @@ class Cache : public IHaveParameters, public SourceObject {
bool checkAndExecute(const Glib::ustring &, const Glib::ustring &, const RowProcessor *);
virtual PresenterPtr openFor(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *) = 0;
- virtual void close(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *) = 0;
+ virtual void save(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *) = 0;
+ virtual void discard(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *) = 0;
protected:
virtual RowSetCPtr getCachedRowSet(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *) const = 0;
diff --git a/project2/common/commonObjects.h b/project2/common/commonObjects.h
index dae563a..fe6bd5c 100644
--- a/project2/common/commonObjects.h
+++ b/project2/common/commonObjects.h
@@ -17,13 +17,13 @@ class CommonObjects : public virtual IntrusivePtrBase {
RowSetPtr getSource(const std::string &) const;
template <class DataSourceType>
- const DataSourceType * dataSource(const std::string & name) const
+ DataSourceType * dataSource(const std::string & name) const
{
DataSources::const_iterator i = datasources.find(name);
if (i == datasources.end()) {
i = loadDataSource(name);
}
- const DataSourceType * s = boost::dynamic_pointer_cast<const DataSourceType>(i->second).get();
+ DataSourceType * s = boost::dynamic_pointer_cast<DataSourceType>(i->second).get();
if (!s) {
throw DataSourceNotCompatible(name);
}
diff --git a/project2/common/rowProcessor.cpp b/project2/common/rowProcessor.cpp
index f9d821e..1e81dc6 100644
--- a/project2/common/rowProcessor.cpp
+++ b/project2/common/rowProcessor.cpp
@@ -8,7 +8,9 @@
RowProcessor::RowProcessor(const xmlpp::Element * p) :
IHaveParameters(p),
recordSource(p->get_attribute_value("source")),
- filter(p->get_attribute_value("filter"))
+ filter(p->get_attribute_value("filter")),
+ CROE(Variable(p, "cacheRowsOnError", false, false)()),
+ IRSE(Variable(p, "ignoreRowSourceError", false, false)())
{
LoaderBase loader(true);
loader.supportedStorers.insert(Storer::into(&caches));
@@ -25,13 +27,13 @@ void
RowProcessor::execute() const
{
IHaveParameters::push(this);
- ScopeObject _ihp(boost::bind(&IHaveParameters::pop));
+ ScopeObject _ihp(
+ boost::bind(&IHaveParameters::pop),
+ boost::bind(&RowProcessor::saveCaches, this),
+ boost::bind((CROE ? &RowProcessor::saveCaches : &RowProcessor::discardCaches), this),
+ boost::bind(&TargetCaches::clear, &tc));
BOOST_FOREACH(const CachePtr & c, caches) {
if (c->checkAndExecute(source->name, filter, this)) {
- BOOST_FOREACH(const TargetCaches::value_type & c, tc) {
- c->get<1>()->close(source->name, filter, this);
- }
- tc.clear();
return;
}
PresenterPtr p = c->openFor(source->name, filter, this);
@@ -40,11 +42,33 @@ RowProcessor::execute() const
}
}
Logger()->messagef(LOG_DEBUG, "Executing from source '%s'", source->name.c_str());
- source->execute(filter, this);
+ if (IRSE) {
+ try {
+ source->execute(filter, this);
+ }
+ catch (const std::exception & e) {
+ Logger()->messagef(LOG_WARNING, "Source '%s' failed with '%s'", source->name.c_str(), e.what());
+ }
+ }
+ else {
+ source->execute(filter, this);
+ }
+}
+
+void
+RowProcessor::saveCaches() const
+{
+ BOOST_FOREACH(const TargetCaches::value_type & c, tc) {
+ c->get<1>()->save(source->name, filter, this);
+ }
+}
+
+void
+RowProcessor::discardCaches() const
+{
BOOST_FOREACH(const TargetCaches::value_type & c, tc) {
- c->get<1>()->close(source->name, filter, this);
+ c->get<1>()->discard(source->name, filter, this);
}
- tc.clear();
}
void
diff --git a/project2/common/rowProcessor.h b/project2/common/rowProcessor.h
index 3f39d19..8a52e01 100644
--- a/project2/common/rowProcessor.h
+++ b/project2/common/rowProcessor.h
@@ -18,6 +18,8 @@ class RowProcessor : public IHaveParameters {
const std::string recordSource;
const Glib::ustring filter;
+ const bool CROE;
+ const bool IRSE;
protected:
boost::intrusive_ptr<RowSet> source;
@@ -33,6 +35,9 @@ class RowProcessor : public IHaveParameters {
typedef boost::shared_ptr<TargetCache> TargetCachePtr;
typedef std::set<TargetCachePtr> TargetCaches;
mutable TargetCaches tc;
+
+ void saveCaches() const;
+ void discardCaches() const;
};
#endif
diff --git a/project2/common/taskHost.cpp b/project2/common/taskHost.cpp
index fa4718b..cecc883 100644
--- a/project2/common/taskHost.cpp
+++ b/project2/common/taskHost.cpp
@@ -2,6 +2,7 @@
#include "taskHost.h"
#include "noOutputExecute.h"
#include "dataSource.h"
+#include "scopeObject.h"
#include <boost/foreach.hpp>
TaskHost::TaskHost(const boost::filesystem::path & file) :
@@ -27,14 +28,8 @@ void
TaskHost::execute() const
{
parseDocument();
- try {
- run(tasks);
- commitAll();
- }
- catch (...) {
- rollbackAll();
- throw;
- }
+ ScopeObject txHandler(boost::bind(&TaskHost::commitAll, this), boost::bind(&TaskHost::rollbackAll, this));
+ run(tasks);
}
void
diff --git a/project2/common/viewHost.cpp b/project2/common/viewHost.cpp
index 9157e01..41ed05b 100644
--- a/project2/common/viewHost.cpp
+++ b/project2/common/viewHost.cpp
@@ -29,6 +29,10 @@ ViewHost::executeViews(const DefaultPresenterProvider & dpp) const
BOOST_FOREACH(const Views::value_type & s, views) {
s->execute(&pmp);
}
+ // Caches might open transactions
+ BOOST_FOREACH(const DataSources::value_type & ds, datasources) {
+ ds.second->commit();
+ }
}
void
diff --git a/project2/sql/sqlCache.cpp b/project2/sql/sqlCache.cpp
index 44b4399..d6875ac 100644
--- a/project2/sql/sqlCache.cpp
+++ b/project2/sql/sqlCache.cpp
@@ -195,7 +195,7 @@ class SqlCache : public Cache {
sql.append(", ?");
}
sql.appendf(")");
- ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+ ModifyPtr m(db->getWritable().newModifyCommand(sql));
unsigned int offset = 0;
m->bindParamI(offset++, row++);
BOOST_FOREACH(const Values::value_type & a, attrs) {
@@ -222,10 +222,14 @@ class SqlCache : public Cache {
PresenterPtr openFor(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps)
{
+ Buffer sp;
+ sp.appendf("SAVEPOINT sp%p", this);
+ ModifyPtr s = ModifyPtr(db->getWritable().newModifyCommand(sp));
+ s->execute();
// Header
Buffer del;
del.appendf("INSERT INTO %s(p2_time) VALUES(?)", HeaderTable.c_str());
- ModifyPtr h = ModifyPtr(db->getReadonly().newModifyCommand(del));
+ ModifyPtr h = ModifyPtr(db->getWritable().newModifyCommand(del));
h->bindParamT(0, time(NULL));
h->execute();
// Record set header
@@ -237,15 +241,27 @@ class SqlCache : public Cache {
offset = 0;
applyKeys(boost::bind(appendKeyBinds, &sql, &offset), ps);
sql.appendf(")");
- ModifyPtr m(db->getReadonly().newModifyCommand(sql));
+ ModifyPtr m(db->getWritable().newModifyCommand(sql));
offset = 0;
applyKeys(boost::bind(bindKeyValues, m.get(), &offset, _2), ps);
m->execute();
return new SqlCachePresenter(n, f, db);
}
- void close(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * )
+ void save(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * )
{
+ Buffer sp;
+ sp.appendf("RELEASE SAVEPOINT sp%p", this);
+ ModifyPtr s = ModifyPtr(db->getWritable().newModifyCommand(sp));
+ s->execute();
+ }
+
+ void discard(const Glib::ustring & , const Glib::ustring & , const IHaveParameters * )
+ {
+ Buffer sp;
+ sp.appendf("ROLLBACK TO SAVEPOINT sp%p", this);
+ ModifyPtr s = ModifyPtr(db->getWritable().newModifyCommand(sp));
+ s->execute();
}
private:
@@ -285,12 +301,13 @@ class CustomSqlCacheLoader : public ElementLoaderImpl<SqlCache> {
{
if (!SqlCache::DataSource.empty()) {
boost::intrusive_ptr<CommonObjects> co = new CommonObjects();
- const RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource);
+ RdbmsDataSource * db = co->dataSource<RdbmsDataSource>(SqlCache::DataSource);
Buffer del;
del.appendf("DELETE FROM %s WHERE p2_time < ?", SqlCache::HeaderTable.c_str());
- ModifyPtr m(db->getReadonly().newModifyCommand(del));
+ ModifyPtr m(db->getWritable().newModifyCommand(del));
m->bindParamT(0, time(NULL) - SqlCache::CacheLife);
m->execute();
+ db->commit();
}
}
diff --git a/project2/xml/xmlCache.cpp b/project2/xml/xmlCache.cpp
index 020ad04..c79af2c 100644
--- a/project2/xml/xmlCache.cpp
+++ b/project2/xml/xmlCache.cpp
@@ -43,7 +43,7 @@ class XmlCache : public Cache {
return writeTo;
}
- void close(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps)
+ void save(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps)
{
boost::filesystem::path target = getCacheFile(n, f, ps);
try {
@@ -57,6 +57,10 @@ class XmlCache : public Cache {
}
}
+ void discard(const Glib::ustring &, const Glib::ustring &, const IHaveParameters *)
+ {
+ }
+
private:
boost::filesystem::path getCacheFile(const Glib::ustring & n, const Glib::ustring & f, const IHaveParameters * ps) const
{