diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 83 |
1 files changed, 35 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 9ea638b877e..70150c848bc 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -8,6 +8,7 @@ // ********************************************************************** #include <IceUtil/DisableWarnings.h> +#include <Freeze/Freeze.h> #include <IceStorm/TopicManagerI.h> #include <IceStorm/TopicI.h> #include <IceStorm/TraceLevels.h> @@ -15,7 +16,8 @@ #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> #include <IceStorm/Subscriber.h> -#include <IceStorm/DB.h> +#include <IceStorm/SubscriberMap.h> +#include <IceStorm/LLUMap.h> #include <IceStorm/Util.h> #include <Ice/SliceChecksums.h> @@ -24,12 +26,15 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; +using namespace IceStormInternal; -using namespace IceDB; +using namespace Freeze; namespace { +const string subscriberDbName = "subscribers"; + void halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { @@ -289,8 +294,7 @@ nameToIdentity(const InstancePtr& instance, const string& name) } TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : - _instance(instance), - _connectionPool(instance->connectionPool()) + _instance(instance) { try { @@ -317,18 +321,15 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl); } - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); // Ensure that the llu counter is present in the log. - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); LogUpdate empty = {0, 0}; - lluWrapper->put(empty); + putLLU(connection, empty); // Recreate each of the topics. - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - map<SubscriberRecordKey, SubscriberRecord> subscriberMap = subscribersWrapper->getMap(); - - map<SubscriberRecordKey, SubscriberRecord>::const_iterator p = subscriberMap.begin(); + SubscriberMap subscriberMap(connection, subscriberDbName); + SubscriberMap::const_iterator p = subscriberMap.begin(); while(p != subscriberMap.end()) { // This record has to be a place holder record, otherwise @@ -386,7 +387,7 @@ TopicManagerImpl::create(const string& name) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; @@ -395,13 +396,12 @@ TopicManagerImpl::create(const string& name) rec.link = false; rec.cost = 0; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->put(key, rec); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.put(SubscriberMap::value_type(key, rec)); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -417,7 +417,6 @@ TopicManagerImpl::create(const string& name) } _instance->observers()->createTopic(llu, name); - return installTopic(name, id, true); } @@ -491,14 +490,13 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - lluWrapper->put(llu); + putLLU(connection, llu); - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->clear(); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.clear(); for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) { @@ -508,7 +506,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont rec.link = false; rec.cost = 0; - subscribersWrapper->put(key, rec); + subscriberMap.put(SubscriberMap::value_type(key, rec)); for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) { @@ -516,7 +514,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont key.topic = p->id; key.id = q->id; - subscribersWrapper->put(key, *q); + subscriberMap.put(SubscriberMap::value_type(key, *q)); } } txn.commit(); @@ -595,7 +593,7 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; @@ -604,19 +602,14 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) rec.link = false; rec.cost = 0; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - try + SubscriberMap subscriberMap(connection, subscriberDbName); + if(subscriberMap.find(key) != subscriberMap.end()) { - subscribersWrapper->find(key); throw ObserverInconsistencyException("topic exists: " + name); } - catch(const NotFoundException&) - { - } - subscribersWrapper->put(key, rec); + subscriberMap.put(SubscriberMap::value_type(key, rec)); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -692,7 +685,7 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) reap(); } - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); for(;;) { @@ -704,9 +697,7 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) TopicContent rec = p->second->getContent(); content.push_back(rec); } - - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); break; } catch(const DeadlockException&) @@ -716,21 +707,20 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } } LogUpdate TopicManagerImpl::getLastLogUpdate() const { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); for(;;) { try { - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - return lluWrapper->get(); + return getLLU(connection); } catch(const DeadlockException&) { @@ -781,7 +771,7 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& { content.clear(); - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) @@ -789,10 +779,7 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& TopicContent rec = p->second->getContent(); content.push_back(rec); } - - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - lluWrapper->put(llu); - + putLLU(connection, llu); txn.commit(); break; } |