diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 104 |
1 files changed, 53 insertions, 51 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index b3847acf56e..81f404c1b0b 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -8,13 +8,15 @@ // ********************************************************************** #include <IceUtil/DisableWarnings.h> +#include <Freeze/Freeze.h> #include <IceStorm/TopicI.h> #include <IceStorm/Instance.h> #include <IceStorm/Subscriber.h> #include <IceStorm/TraceLevels.h> #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> -#include <IceStorm/DB.h> +#include <IceStorm/SubscriberMap.h> +#include <IceStorm/LLUMap.h> #include <IceStorm/Util.h> #include <Ice/LoggerUtil.h> #include <algorithm> @@ -22,12 +24,15 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; +using namespace IceStormInternal; -using namespace IceDB; +using namespace Freeze; namespace { +const string subscriberDbName = "subscribers"; + void halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { @@ -405,7 +410,6 @@ TopicImpl::TopicImpl( _instance(instance), _name(name), _id(id), - _connectionPool(instance->connectionPool()), _destroyed(false) { try @@ -650,20 +654,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection( + _instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = record.id; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->erase(key); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.erase(key); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -687,21 +691,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->put(key, record); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.put(SubscriberMap::value_type(key, record)); // Update the LLU. - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -772,20 +775,19 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->put(key, record); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.put(SubscriberMap::value_type(key, record)); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -896,20 +898,19 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = id; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->put(key, record); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.put(SubscriberMap::value_type(key, record)); - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -1320,19 +1321,18 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->put(key, record); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.put(SubscriberMap::value_type(key, record)); // Update the LLU. - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -1389,7 +1389,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) @@ -1398,13 +1398,10 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq key.topic = _id; key.id = *id; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->erase(key); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.erase(key); } - - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - lluWrapper->put(llu); - + putLLU(connection, llu); txn.commit(); break; } @@ -1485,25 +1482,31 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); // Erase all subscriber records and the topic record. - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->eraseTopic(_id); + SubscriberMap subscriberMap(connection, subscriberDbName); + + IceStorm::SubscriberRecordKey key; + key.topic = _id; + SubscriberMap::iterator p = subscriberMap.find(key); + while(p != subscriberMap.end() && p->first.topic == key.topic) + { + subscriberMap.erase(p++); + } // Update the LLU. - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); if(master) { - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; } else { llu = origLLU; } - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; @@ -1560,7 +1563,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName()); TransactionHolder txn(connection); for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) @@ -1569,14 +1572,13 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) key.topic = _id; key.id = *id; - SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection); - subscribersWrapper->erase(key); + SubscriberMap subscriberMap(connection, subscriberDbName); + subscriberMap.erase(key); } - LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection); - llu = lluWrapper->get(); + llu = getLLU(connection); llu.iteration++; - lluWrapper->put(llu); + putLLU(connection, llu); txn.commit(); break; |