diff options
author | Matthew Newhook <matthew@zeroc.com> | 2006-11-09 09:01:11 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2006-11-09 09:01:11 +0000 |
commit | 382aef369900b75eeaf8e3ec4e0a27cb36c20355 (patch) | |
tree | 90debb0cfcf3473360e6b4e57aaa03bd3607d74a /cpp/src/IceStorm/TopicI.cpp | |
parent | cosmetic change (diff) | |
download | ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.bz2 ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.xz ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.zip |
Fixes as suggested by Bernard (namespace { }, PublisherProxyI ->
PublisherI, etc.
New SubscriberPool reduction algorithm.
gcc compiler bug fix.
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 84 |
1 files changed, 47 insertions, 37 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 87dfde4f6c7..8ed30f7330a 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -24,18 +24,18 @@ using namespace IceStorm; using namespace std; -namespace IceStorm +namespace { // // The servant has a 1-1 association with a topic. It is used to // receive events from Publishers. // -class PublisherProxyI : public Ice::BlobjectArray +class PublisherI : public Ice::BlobjectArray { public: - PublisherProxyI(const TopicIPtr& topic) : + PublisherI(const TopicIPtr& topic) : _topic(topic) { } @@ -51,7 +51,12 @@ public: Ice::ByteSeq(), current.ctx); - event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); EventSeq v; v.push_back(event); @@ -109,7 +114,7 @@ private: const SubscriberPtr _subscriber; }; -} // End namespace IceStorm +} TopicI::TopicI( const InstancePtr& instance, @@ -133,7 +138,7 @@ TopicI::TopicI( Ice::Identity id; id.category = _name; id.name = "publish"; - _publisherPrx = _instance->objectAdapter()->add(new PublisherProxyI(this), id); + _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), id); // // Create a servant per topic to receive linked event data. The @@ -164,6 +169,7 @@ TopicI::TopicI( _instance->objectAdapter()->add( new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity())); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } PersistentUpstreamMap::const_iterator upI = _upstream.find(_name); @@ -250,10 +256,13 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& if(p != _subscribers.end()) { (*p)->destroy(); + _instance->subscriberPool()->remove(*p); _subscribers.erase(p); } - _subscribers.push_back(Subscriber::create(_instance, obj, qos)); + SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } Ice::ObjectPrx @@ -288,6 +297,7 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); return subscriber->proxy(); } @@ -407,6 +417,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) IceUtil::Mutex::Lock subscriberSync(_subscribersMutex); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } void @@ -622,11 +633,8 @@ TopicI::reap() // list<SubscriberPtr> error; { - // - // Uses splice for efficiency - // IceUtil::Mutex::Lock errorSync(_errorMutex); - error.splice(error.begin(), _error); + _error.swap(error); } TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -675,31 +683,6 @@ TopicI::reap() } void -TopicI::removeSubscriber(const Ice::ObjectPrx& obj) -{ - Ice::Identity ident = obj->ice_getIdentity(); - - IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); - if(p != _subscribers.end()) - { - (*p)->destroy(); - _subscribers.erase(p); - return; - } - - // - // If the subscriber was not found then display a diagnostic. - // - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(ident) << ": not subscribed."; - } -} - -void TopicI::publish(bool forwarded, const EventSeq& events) { // @@ -740,7 +723,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) // if(!flush.empty()) { - _instance->subscriberPool()->add(flush); + _instance->subscriberPool()->flush(flush); } // @@ -778,6 +761,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) { reap.push_back(*q); } + _instance->subscriberPool()->remove(*q); _subscribers.erase(q); } } @@ -793,3 +777,29 @@ TopicI::publish(bool forwarded, const EventSeq& events) _error.splice(_error.begin(), reap); } } + +void +TopicI::removeSubscriber(const Ice::ObjectPrx& obj) +{ + Ice::Identity ident = obj->ice_getIdentity(); + + IceUtil::Mutex::Lock sync(_subscribersMutex); + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); + return; + } + + // + // If the subscriber was not found then display a diagnostic. + // + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(ident) << ": not subscribed."; + } +} |