diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 215 |
1 files changed, 9 insertions, 206 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 422cafc5615..b00bbc2ab45 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -12,7 +12,6 @@ #include <IceStorm/Subscriber.h> #include <IceStorm/TraceLevels.h> #include <IceStorm/Event.h> -#include <IceStorm/KeepAliveThread.h> #include <IceStorm/SubscriberPool.h> #include <Ice/LoggerUtil.h> @@ -94,26 +93,6 @@ private: const TopicIPtr _topic; }; -class TopicUpstreamLinkI : public TopicUpstreamLink -{ -public: - - TopicUpstreamLinkI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - keepAlive(const Ice::Current&) - { - _subscriber->reachable(); - } - -private: - - const SubscriberPtr _subscriber; -}; - } TopicI::TopicI( @@ -127,7 +106,6 @@ TopicI::TopicI( _connection(Freeze::createConnection(instance->communicator(), envName)), _topics(_connection, dbName, false), _topicRecord(topicRecord), - _upstream(_connection, "upstream", false), _destroyed(false) { // @@ -161,38 +139,13 @@ TopicI::TopicI( } // - // Create the subscriber object and the upstream servant and - // add it to the set of subscribers. + // Create the subscriber object add it to the set of + // subscribers. // SubscriberPtr subscriber = Subscriber::create(_instance, p->second.obj, p->second.cost); - TopicUpstreamLinkPrx upstream = TopicUpstreamLinkPrx::uncheckedCast( - _instance->objectAdapter()->add( - new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity())); _subscribers.push_back(subscriber); _instance->subscriberPool()->add(subscriber); } - - PersistentUpstreamMap::const_iterator upI = _upstream.find(_name); - if(upI != _upstream.end()) - { - // - // This record should really be there, but its possible for it - // not to be in the event of a crash in between the add of the - // topic record and the add of the upstream record. - // - _upstreamRecord = upI->second; - } - - for(TopicUpstreamLinkPrxSeq::const_iterator q = _upstreamRecord.begin(); q != _upstreamRecord.end(); ++q) - { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " upstream " << _instance->communicator()->identityToString((*q)->ice_getIdentity()); - } - _instance->keepAlive()->add(*q); - } } string @@ -340,8 +293,7 @@ TopicI::getLinkProxy(const Ice::Current&) void TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) { - string name = topic->getName(); - TopicInternalPrx internal = TopicInternalPrx::checkedCast(topic); + TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic); TopicLinkPrx link = internal->getLinkProxy(); IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); @@ -352,6 +304,8 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) reap(); + string name = topic->ice_getIdentity().name; + if(_topicRecord.find(name) != _topicRecord.end()) { LinkExists ex; @@ -367,47 +321,14 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) } SubscriberPtr subscriber = Subscriber::create(_instance, link, cost); - TopicUpstreamLinkPrx upstream; - try - { - upstream = TopicUpstreamLinkPrx::uncheckedCast( - _instance->objectAdapter()->addWithUUID(new TopicUpstreamLinkI(subscriber))); - } - catch(const Ice::ObjectAdapterDeactivatedException&) - { - subscriber->destroy(); - throw; - } // - // Notify the downstream topic that it is now linked. For linking - // we use the "notify & save" strategy. This is important because - // if there is a failure after the notify and before the save then - // the downstream service will detect that the topic upstream link - // does not exist and correctly clean up. If we saved and then - // notified we could have a downstream linked client that does not - // know it is in fact linked -- and this is not easily detectable. - // - try - { - internal->linkNotification(_name, upstream); - } - catch(const Ice::Exception&) - { - // Cleanup. - _instance->objectAdapter()->remove(upstream->ice_getIdentity()); - subscriber->destroy(); - throw; - } - - // // Create the LinkRecord // LinkRecord record; record.obj = link; record.cost = cost; record.theTopic = topic; - record.upstream = upstream; // // Save @@ -423,12 +344,6 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) void TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) { - unlinkByName(topic->getName(), current); -} - -void -TopicI::unlinkByName(const string& name, const Ice::Current&) -{ IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { @@ -436,6 +351,8 @@ TopicI::unlinkByName(const string& name, const Ice::Current&) } reap(); + + string name = topic->ice_getIdentity().name; LinkRecordDict::iterator q = _topicRecord.find(name); if(q == _topicRecord.end()) @@ -452,18 +369,7 @@ TopicI::unlinkByName(const string& name, const Ice::Current&) throw ex; } - // - // First save and then notify. unlinking we use the "save & - // notify" strategy. This is important because if there is a - // failure after the save and before the notify then the - // downstream service will detect that the topic upstream link - // does not exist and correctly clean up. - // - - // - // Copy the record first because we use it after the save. - // - LinkRecord rec = q->second; + Ice::ObjectPrx subscriber = q->second.obj; _topicRecord.erase(q); // @@ -471,40 +377,13 @@ TopicI::unlinkByName(const string& name, const Ice::Current&) // _topics.put(PersistentTopicMap::value_type(_name, _topicRecord)); - // - // Remove the TopicUpstreamLink servant. - // - try - { - _instance->objectAdapter()->remove(rec.upstream->ice_getIdentity()); - } - catch(const Ice::ObjectAdapterDeactivatedException&) - { - // Ignore -- this could occur on shutdown. - } - TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); out << _name << " unlink " << name; } - removeSubscriber(rec.obj); - - TopicInternalPrx internal = TopicInternalPrx::checkedCast(rec.theTopic); - try - { - internal->unlinkNotification(_name, rec.upstream); - } - catch(const Ice::Exception& e) - { - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlinkNotification failed: " << name << ": " << e; - } - // Ignore. This will be detected upon a restart. - } + removeSubscriber(subscriber); } LinkInfoSeq @@ -531,13 +410,7 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const void TopicI::destroy(const Ice::Current&) { - // - // This method must lock both the record mutexes otherwise we can - // end up with the database record being re-added if - // TopicManagerI::reap() is called concurrently with destroy/link. - // IceUtil::RecMutex::Lock sync(_topicRecordMutex); - IceUtil::RecMutex::Lock sync2(_upstreamRecordMutex); if(_destroyed) { @@ -556,59 +429,6 @@ TopicI::destroy(const Ice::Current&) } } -void -TopicI::linkNotification(const string& name, const TopicUpstreamLinkPrx& upstream, const Ice::Current&) -{ - IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex); - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " linkNotification " << name; - } - - _upstreamRecord.push_back(upstream); - _instance->keepAlive()->add(upstream); - - // - // Save - // - _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord)); -} - -void -TopicI::unlinkNotification(const string& name, const TopicUpstreamLinkPrx& upstream, const Ice::Current&) -{ - IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex); - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlinkNotification " << name; - } - - TopicUpstreamLinkPrxSeq::iterator p = find(_upstreamRecord.begin(), _upstreamRecord.end(), upstream); - if(p != _upstreamRecord.end()) - { - _upstreamRecord.erase(p); - } - _instance->keepAlive()->remove(upstream); - - // - // Save - // - _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord)); -} - bool TopicI::destroyed() const { @@ -663,23 +483,6 @@ TopicI::reap() _topics.put(PersistentTopicMap::value_type(_name, _topicRecord)); } } - - // - // Now reap any dead upstream topics. - // - { - IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex); - if(_destroyed) - { - return; - } - - if(_instance->keepAlive()->filter(_upstreamRecord)) - { - // Save. - _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord)); - } - } } void |