diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 99 |
1 files changed, 45 insertions, 54 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 77b6ae00542..8c0dd7f602d 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -14,19 +14,15 @@ #include <IceStorm/TraceLevels.h> #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> +#include <IceStorm/DB.h> #include <Ice/LoggerUtil.h> - #include <algorithm> using namespace std; using namespace IceStorm; using namespace IceStormElection; -#ifdef QTSQL -using namespace IceSQL; -#else -using namespace Freeze; -#endif +using namespace IceDB; namespace { @@ -37,9 +33,6 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { Ice::Error error(com->getLogger()); error << "fatal exception: " << ex << "\n*** Aborting application ***"; -#ifdef __GNUC__ - error << "\n" << ex.ice_stackTrace(); -#endif } abort(); @@ -406,13 +399,11 @@ TopicImpl::TopicImpl( const InstancePtr& instance, const string& name, const Ice::Identity& id, - const SubscriberRecordSeq& subscribers, - const IceStorm::DatabaseCachePtr& databaseCache - ) : + const SubscriberRecordSeq& subscribers) : _instance(instance), _name(name), _id(id), - _databaseCache(databaseCache), + _databaseCache(instance->databaseCache()), _destroyed(false) { try @@ -651,8 +642,6 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) LogUpdate llu; - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); if(p != _subscribers.end()) { @@ -665,19 +654,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = record.id; - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.erase(key); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->erase(key); - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -701,20 +691,21 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.put(key, record); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->put(key, record); // Update the LLU. - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -790,13 +781,13 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) key.topic = _id; key.id = subscriber->id(); - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.put(key, record); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->put(key, record); - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -912,13 +903,13 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) key.topic = _id; key.id = id; - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.put(key, record); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->put(key, record); - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -1309,12 +1300,12 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r key.topic = _id; key.id = subscriber->id(); - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.put(key, record); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->put(key, record); // Update the LLU. - LLUWrapper llumap(_databaseCache, connection); - llumap.put(llu); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + lluWrapper->put(llu); txn.commit(); break; @@ -1380,12 +1371,12 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq key.topic = _id; key.id = *id; - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.erase(key); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->erase(key); } - LLUWrapper llumap(_databaseCache, connection); - llumap.put(llu); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + lluWrapper->put(llu); txn.commit(); break; @@ -1451,21 +1442,21 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) TransactionHolder txn(connection); // Erase all subscriber records and the topic record. - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.eraseTopic(_id); + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->eraseTopic(_id); // Update the LLU. - LLUWrapper llumap(_databaseCache, connection); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); if(master) { - llu = llumap.get(); + llu = lluWrapper->get(); llu.iteration++; } else { llu = origLLU; } - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; @@ -1530,15 +1521,15 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) SubscriberRecordKey key; key.topic = _id; key.id = *id; - - SubscriberMapWrapper subscriberMap(_databaseCache, connection); - subscriberMap.erase(key); + + SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection); + subscribersWrapper->erase(key); } - LLUWrapper llumap(_databaseCache, connection); - llu = llumap.get(); + LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection); + llu = lluWrapper->get(); llu.iteration++; - llumap.put(llu); + lluWrapper->put(llu); txn.commit(); break; |