diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 80 |
1 files changed, 32 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 3a065b6498d..56e166031a8 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -15,10 +15,8 @@ #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> #include <IceStorm/Subscriber.h> +#include <IceStorm/DB.h> #include <Ice/SliceChecksums.h> -#ifdef QTSQL -# include <Ice/Instance.h> -#endif #include <functional> @@ -26,11 +24,7 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; -#ifdef QTSQL -using namespace IceSQL; -#else -using namespace Freeze; -#endif +using namespace IceDB; namespace { @@ -41,9 +35,6 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { Ice::Error error(com->getLogger()); error << "fatal exception: " << ex << "\n*** Aborting application ***"; -#ifdef __GNUC__ - error << "\n" << ex.ice_stackTrace(); -#endif } abort(); @@ -298,7 +289,7 @@ nameToIdentity(const InstancePtr& instance, const string& name) TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : _instance(instance), - _databaseCache(new IceStorm::DatabaseCache(instance)) + _databaseCache(instance->databaseCache()) { try { @@ -323,18 +314,15 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : DatabaseConnectionPtr connection = _databaseCache->getConnection(); // Ensure that the llu counter is present in the log. - LLUWrapper llumap(_databaseCache, connection); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); LogUpdate empty = {0, 0}; - llumap.put(empty); + lluWrapper->put(empty); // Recreate each of the topics. - SubscriberMapWrapper subWrap(_databaseCache, connection); -#ifdef QTSQL - SubscriberMap subscriberMap = subWrap.getMap(); -#else - SubscriberMap& subscriberMap = subWrap.getMap(); -#endif - SubscriberMap::const_iterator p = subscriberMap.begin(); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + map<SubscriberRecordKey, SubscriberRecord> subscriberMap = subscribersWrapper->getMap(); + + map<SubscriberRecordKey, SubscriberRecord>::const_iterator p = subscriberMap.begin(); while(p != subscriberMap.end()) { // This record has to be a place holder record, otherwise @@ -401,13 +389,13 @@ TopicManagerImpl::create(const string& name) rec.link = false; rec.cost = 0; - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.put(key, rec); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->put(key, rec); - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -496,11 +484,11 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont DatabaseConnectionPtr connection = _databaseCache->getConnection(); TransactionHolder txn(connection); - LLUWrapper llumap(_databaseCache, connection); - llumap.put(llu); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + lluWrapper->put(llu); - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.clear(); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->clear(); for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) { @@ -510,7 +498,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont rec.link = false; rec.cost = 0; - subscriberMap.put(key, rec); + subscribersWrapper->put(key, rec); for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) { @@ -518,7 +506,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont key.topic = p->id; key.id = q->id; - subscriberMap.put(key, *q); + subscribersWrapper->put(key, *q); } } txn.commit(); @@ -606,19 +594,19 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) rec.link = false; rec.cost = 0; - SubscriberMapWrapper subscriberMap(_databaseCache, connection); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); try { - subscriberMap.find(key); + subscribersWrapper->find(key); throw ObserverInconsistencyException("topic exists: " + name); } catch(const NotFoundException&) { } - subscriberMap.put(key, rec); + subscribersWrapper->put(key, rec); - LLUWrapper llumap(_databaseCache, connection); - llumap.put(llu); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + lluWrapper->put(llu); txn.commit(); break; @@ -707,8 +695,8 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) content.push_back(rec); } - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); break; } catch(const DeadlockException&) @@ -731,8 +719,8 @@ TopicManagerImpl::getLastLogUpdate() const { try { - LLUWrapper llumap(_databaseCache, connection); - return llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + return lluWrapper->get(); } catch(const DeadlockException&) { @@ -792,8 +780,8 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& content.push_back(rec); } - LLUWrapper llumap(_databaseCache, connection); - llumap.put(llu); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + lluWrapper->put(llu); txn.commit(); break; @@ -892,11 +880,7 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool } // Create topic implementation -#ifdef QTSQL - TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, _databaseCache); -#else - TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, new IceStorm::DatabaseCache(_instance)); -#endif + TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers); // The identity is the name of the Topic. _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl)); |