diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 84 |
1 files changed, 47 insertions, 37 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 87dfde4f6c7..8ed30f7330a 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -24,18 +24,18 @@ using namespace IceStorm; using namespace std; -namespace IceStorm +namespace { // // The servant has a 1-1 association with a topic. It is used to // receive events from Publishers. // -class PublisherProxyI : public Ice::BlobjectArray +class PublisherI : public Ice::BlobjectArray { public: - PublisherProxyI(const TopicIPtr& topic) : + PublisherI(const TopicIPtr& topic) : _topic(topic) { } @@ -51,7 +51,12 @@ public: Ice::ByteSeq(), current.ctx); - event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); EventSeq v; v.push_back(event); @@ -109,7 +114,7 @@ private: const SubscriberPtr _subscriber; }; -} // End namespace IceStorm +} TopicI::TopicI( const InstancePtr& instance, @@ -133,7 +138,7 @@ TopicI::TopicI( Ice::Identity id; id.category = _name; id.name = "publish"; - _publisherPrx = _instance->objectAdapter()->add(new PublisherProxyI(this), id); + _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), id); // // Create a servant per topic to receive linked event data. The @@ -164,6 +169,7 @@ TopicI::TopicI( _instance->objectAdapter()->add( new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity())); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } PersistentUpstreamMap::const_iterator upI = _upstream.find(_name); @@ -250,10 +256,13 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& if(p != _subscribers.end()) { (*p)->destroy(); + _instance->subscriberPool()->remove(*p); _subscribers.erase(p); } - _subscribers.push_back(Subscriber::create(_instance, obj, qos)); + SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } Ice::ObjectPrx @@ -288,6 +297,7 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); return subscriber->proxy(); } @@ -407,6 +417,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) IceUtil::Mutex::Lock subscriberSync(_subscribersMutex); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } void @@ -622,11 +633,8 @@ TopicI::reap() // list<SubscriberPtr> error; { - // - // Uses splice for efficiency - // IceUtil::Mutex::Lock errorSync(_errorMutex); - error.splice(error.begin(), _error); + _error.swap(error); } TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -675,31 +683,6 @@ TopicI::reap() } void -TopicI::removeSubscriber(const Ice::ObjectPrx& obj) -{ - Ice::Identity ident = obj->ice_getIdentity(); - - IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); - if(p != _subscribers.end()) - { - (*p)->destroy(); - _subscribers.erase(p); - return; - } - - // - // If the subscriber was not found then display a diagnostic. - // - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(ident) << ": not subscribed."; - } -} - -void TopicI::publish(bool forwarded, const EventSeq& events) { // @@ -740,7 +723,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) // if(!flush.empty()) { - _instance->subscriberPool()->add(flush); + _instance->subscriberPool()->flush(flush); } // @@ -778,6 +761,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) { reap.push_back(*q); } + _instance->subscriberPool()->remove(*q); _subscribers.erase(q); } } @@ -793,3 +777,29 @@ TopicI::publish(bool forwarded, const EventSeq& events) _error.splice(_error.begin(), reap); } } + +void +TopicI::removeSubscriber(const Ice::ObjectPrx& obj) +{ + Ice::Identity ident = obj->ice_getIdentity(); + + IceUtil::Mutex::Lock sync(_subscribersMutex); + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); + return; + } + + // + // If the subscriber was not found then display a diagnostic. + // + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(ident) << ": not subscribed."; + } +} |