diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 387 |
1 files changed, 164 insertions, 223 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 88dc8af7acd..55f52489158 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -13,8 +13,6 @@ #include <IceStorm/TraceLevels.h> #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> -#include <IceStorm/SubscriberMap.h> -#include <IceStorm/LLUMap.h> #include <IceStorm/Util.h> #include <Ice/LoggerUtil.h> #include <algorithm> @@ -24,22 +22,14 @@ using namespace IceStorm; using namespace IceStormElection; using namespace IceStormInternal; -using namespace Freeze; - namespace { -const string subscriberDbName = "subscribers"; - void -halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) +logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex) { - { - Ice::Error error(com->getLogger()); - error << "fatal exception: " << ex << "\n*** Aborting application ***"; - } - - abort(); + Ice::Error error(com->getLogger()); + error << "LMDB error: " << ex; } // @@ -50,7 +40,7 @@ class PublisherI : public Ice::BlobjectArray { public: - PublisherI(const TopicImplPtr& topic, const InstancePtr& instance) : + PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) : _topic(topic), _instance(instance) { } @@ -80,7 +70,7 @@ public: private: const TopicImplPtr _topic; - const InstancePtr _instance; + const PersistentInstancePtr _instance; }; // @@ -91,7 +81,7 @@ class TopicLinkI : public TopicLink { public: - TopicLinkI(const TopicImplPtr& impl, const InstancePtr& instance) : + TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) : _impl(impl), _instance(instance) { } @@ -106,14 +96,14 @@ public: private: const TopicImplPtr _impl; - const InstancePtr _instance; + const PersistentInstancePtr _instance; }; class TopicI : public TopicInternal { public: - TopicI(const TopicImplPtr& impl, const InstancePtr& instance) : + TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) : _impl(impl), _instance(instance) { } @@ -343,26 +333,22 @@ private: } const TopicImplPtr _impl; - const InstancePtr _instance; + const PersistentInstancePtr _instance; }; } -namespace IceStorm -{ -extern string identityToTopicName(const Ice::Identity& id); -} - TopicImpl::TopicImpl( - const InstancePtr& instance, + const PersistentInstancePtr& instance, const string& name, const Ice::Identity& id, const SubscriberRecordSeq& subscribers) : _instance(instance), - _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())), _name(name), _id(id), - _destroyed(false) + _destroyed(false), + _lluMap(_instance->lluMap()), + _subscriberMap(_instance->subscriberMap()) { try { @@ -455,11 +441,6 @@ TopicImpl::TopicImpl( __setNoDelete(false); } -TopicImpl::~TopicImpl() -{ - //cout << "~TopicImpl" << endl; -} - string TopicImpl::getName() const { @@ -496,7 +477,7 @@ TopicImpl::getNonReplicatedPublisher() const namespace { void -trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s) +trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s) { out << '['; for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p) @@ -568,34 +549,24 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) LogUpdate llu; SubscriberPtr subscriber = Subscriber::create(_instance, record); - for(;;) + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - SubscriberRecordKey key; - key.topic = _id; - key.id = subscriber->id(); + SubscriberRecordKey key; + key.topic = _id; + key.id = subscriber->id(); - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.put(SubscriberMap::value_type(key, record)); + _subscriberMap.put(txn, key, record); - llu = getLLU(_connection); - llu.iteration++; - putLLU(_connection, llu); + llu = getIncrementedLLU(txn, _lluMap); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } _subscribers.push_back(subscriber); @@ -680,7 +651,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); if(p != _subscribers.end()) { - string name = identityToTopicName(id); + string name = IceStormInternal::identityToTopicName(id); LinkExists ex; ex.name = name; throw ex; @@ -690,34 +661,24 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) SubscriberPtr subscriber = Subscriber::create(_instance, record); - for(;;) + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - SubscriberRecordKey key; - key.topic = _id; - key.id = id; + SubscriberRecordKey key; + key.topic = _id; + key.id = id; - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.put(SubscriberMap::value_type(key, record)); + _subscriberMap.put(txn, key, record); - llu = getLLU(_connection); - llu.iteration++; - putLLU(_connection, llu); + llu = getIncrementedLLU(txn, _lluMap); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } _subscribers.push_back(subscriber); @@ -739,7 +700,7 @@ TopicImpl::unlink(const TopicPrx& topic) vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p == _subscribers.end()) { - string name = identityToTopicName(id); + string name = IceStormInternal::identityToTopicName(id); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { @@ -814,7 +775,7 @@ TopicImpl::getLinkInfoSeq() const if(record.link && !(*p)->errored()) { LinkInfo info; - info.name = identityToTopicName(record.theTopic->ice_getIdentity()); + info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity()); info.cost = record.cost; info.theTopic = record.theTopic; seq.push_back(info); @@ -974,7 +935,7 @@ class TopicInternalReapCB : public IceUtil::Shared { public: - TopicInternalReapCB(const InstancePtr& instance, Ice::Long generation) : + TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) : _instance(instance), _generation(generation) { } @@ -992,7 +953,7 @@ public: private: - const InstancePtr _instance; + const PersistentInstancePtr _instance; const Ice::Long _generation; }; @@ -1112,33 +1073,25 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r } SubscriberPtr subscriber = Subscriber::create(_instance, record); - for(;;) + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - SubscriberRecordKey key; - key.topic = _id; - key.id = subscriber->id(); + SubscriberRecordKey key; + key.topic = _id; + key.id = subscriber->id(); - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.put(SubscriberMap::value_type(key, record)); + _subscriberMap.put(txn, key, record); - // Update the LLU. - putLLU(_connection, llu); + // Update the LLU. + _lluMap.put(txn, lluDbKey, llu); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } _subscribers.push_back(subscriber); @@ -1165,7 +1118,32 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq IceUtil::Mutex::Lock sync(_subscribersMutex); - // Remove the subscriber from the subscribers list. If the + + // First remove from the database. + try + { + IceDB::ReadWriteTxn txn(_instance->dbEnv()); + + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + SubscriberRecordKey key; + key.topic = _id; + key.id = *id; + + _subscriberMap.del(txn, key); + } + + _lluMap.put(txn, lluDbKey, llu); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller + } + + // Then remove the subscriber from the subscribers list. If the // subscriber had a local failure and was removed from the // subscriber list it could already be gone. That's not a problem. for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) @@ -1177,36 +1155,6 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq _subscribers.erase(p); } } - - // Next remove from the database. - for(;;) - { - try - { - TransactionHolder txn(_connection); - - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) - { - SubscriberRecordKey key; - key.topic = _id; - key.id = *id; - - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.erase(key); - } - putLLU(_connection, llu); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } - } } void @@ -1259,60 +1207,59 @@ TopicImpl::updateSubscriberObservers() LogUpdate TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) { - _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity()); - _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity()); - _instance->topicReaper()->add(_name); - - // Destroy each of the subscribers. - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) - { - (*p)->destroy(); - } - _subscribers.clear(); // Clear out the database records related to this topic. LogUpdate llu; - for(;;) + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - // Erase all subscriber records and the topic record. - SubscriberMap subscriberMap(_connection, subscriberDbName); + // Erase all subscriber records and the topic record. + SubscriberRecordKey key; + key.topic = _id; - IceStorm::SubscriberRecordKey key; - key.topic = _id; - SubscriberMap::iterator p = subscriberMap.find(key); - while(p != subscriberMap.end() && p->first.topic == key.topic) - { - subscriberMap.erase(p++); - } + SubscriberMapRWCursor cursor(_subscriberMap, txn); + if(cursor.find(key)) + { + _subscriberMap.del(txn, key); - // Update the LLU. - if(master) + SubscriberRecordKey k; + SubscriberRecord v; + while(cursor.get(k, v, MDB_NEXT) && k.topic == key.topic) { - llu = getLLU(_connection); - llu.iteration++; + _subscriberMap.del(txn, k); } - else - { - llu = origLLU; - } - putLLU(_connection, llu); - - txn.commit(); - break; } - catch(const DeadlockException&) + + // Update the LLU. + if(master) { - continue; + llu = getIncrementedLLU(txn, _lluMap); } - catch(const DatabaseException& ex) + else { - halt(_instance->communicator(), ex); + llu = origLLU; + _lluMap.put(txn, lluDbKey, llu); } + + txn.commit(); } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller + } + + _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity()); + _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity()); + _instance->topicReaper()->add(_name); + + // Destroy each of the subscribers. + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->destroy(); + } + _subscribers.clear(); _instance->topicAdapter()->remove(_id); @@ -1324,66 +1271,60 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) void TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) { - Ice::IdentitySeq removed; - - // First remove the subscriber from the subscribers list. Its - // possible that some of these subscribers have already been - // removed (consider, for example, a concurrent reap call from two - // replicas on the same subscriber). To avoid sending unnecessary - // observer updates keep track of the observers that are actually - // removed. - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) - { - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); - if(p != _subscribers.end()) - { - (*p)->destroy(); - _subscribers.erase(p); - removed.push_back(*id); - } - } + // First update the database - // If there is no further work to do we are done. - if(removed.empty()) - { - return; - } - - // Next update the database and send the notification to any - // slaves. LogUpdate llu; - for(;;) + bool found = false; + try { - try + IceDB::ReadWriteTxn txn(_instance->dbEnv()); + + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) { - TransactionHolder txn(_connection); + SubscriberRecordKey key; + key.topic = _id; + key.id = *id; - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + if(_subscriberMap.del(txn, key)) { - SubscriberRecordKey key; - key.topic = _id; - key.id = *id; - - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.erase(key); + found = true; } + } - llu = getLLU(_connection); - llu.iteration++; - putLLU(_connection, llu); - + if(found) + { + llu = getIncrementedLLU(txn, _lluMap); txn.commit(); - break; } - catch(const DeadlockException&) + else { - continue; + txn.rollback(); } - catch(const DatabaseException& ex) + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller + } + + if(found) + { + // Then remove the subscriber from the subscribers list. Its + // possible that some of these subscribers have already been + // removed (consider, for example, a concurrent reap call from two + // replicas on the same subscriber). To avoid sending unnecessary + // observer updates keep track of the observers that are actually + // removed. + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) { - halt(_instance->communicator(), ex); + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _subscribers.erase(p); + } } - } - _instance->observers()->removeSubscriber(llu, _name, ids); + _instance->observers()->removeSubscriber(llu, _name, ids); + } } |