diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 252 |
1 files changed, 121 insertions, 131 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 9b883c48798..7545393fa70 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -19,7 +19,6 @@ #include <IceStorm/Subscriber.h> #include <IceStorm/TraceLevels.h> #include <algorithm> -#include <list> using namespace IceStorm; using namespace std; @@ -29,11 +28,6 @@ namespace IceStorm { // -// A list of Subscribers. -// -typedef std::list<SubscriberPtr> SubscriberList; - -// // The servant has a 1-1 association with a topic. It is used to // receive events from Publishers. // @@ -92,25 +86,66 @@ private: IceStorm::TopicSubscribersPtr _subscribers; }; +} // End namespace IceStorm -// -// Holder for the set of subscribers. -// -class TopicSubscribers : public IceUtil::Shared +IceStorm::TopicSubscribers::TopicSubscribers(const TraceLevelsPtr& traceLevels) : + _traceLevels(traceLevels) { -public: +} - TopicSubscribers(const TraceLevelsPtr& traceLevels) : - _traceLevels(traceLevels) - { - } +IceStorm::TopicSubscribers::~TopicSubscribers() +{ +} - ~TopicSubscribers() +void +IceStorm::TopicSubscribers::add(const SubscriberPtr& subscriber) +{ + Ice::Identity id = subscriber->id(); + + IceUtil::Mutex::Lock sync(_subscribersMutex); + + // + // If a subscriber with this identity is already subscribed + // then mark the subscriber as replaced. + // + // Note that this doesn't actually remove the Subscribe from + // the list of subscribers - it marks the Subscriber as + // replaced, and it's removed on the next event publish. + // + for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) { + if((*i)->id() == id) + { + // + // This marks the subscriber as invalid. It will be + // removed on the next event publish. + // + (*i)->replace(); + break; + } } + + + // + // Add to the set of subscribers + // + _subscribers.push_back(subscriber); +} - void - add(const SubscriberPtr& subscriber) +// +// Unsubscribe the Subscriber with the given identity. Note that +// this doesn't remove the Subscriber from the list of subscribers +// - it marks the Subscriber as unsubscribed, and it's removed on +// the next event publish. +// +void +IceStorm::TopicSubscribers::remove(const Ice::ObjectPrx& obj) +{ + Ice::Identity id = obj->ice_getIdentity(); + + IceUtil::Mutex::Lock sync(_subscribersMutex); + + for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) { Ice::Identity id = subscriber->id(); @@ -126,15 +161,12 @@ public: // for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) { - if((*i)->id() == id) - { - // - // This marks the subscriber as invalid. It will be - // removed on the next event publish. - // - (*i)->replace(); - break; - } + // + // This marks the subscriber as invalid. It will be + // removed on the next event publish. + // + (*i)->unsubscribe(); + return; } // @@ -142,144 +174,102 @@ public: // _subscribers.push_back(subscriber); } - + // // Unsubscribe the subscriber with the given identity. Note that // this doesn't remove the subscriber from the list of subscribers // - it marks the subscriber as unsubscribed, and it's removed on // the next event publish. // - void - remove(const Ice::ObjectPrx& obj) + if(_traceLevels->topic > 0) { - Ice::Identity id = obj->ice_getIdentity(); - - IceUtil::Mutex::Lock sync(_subscribersMutex); - - for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) - { - if((*i)->id() == id) - { - // - // This marks the subscriber as invalid. It will be - // removed on the next event publish. - // - (*i)->unsubscribe(); - return; - } - } - - // - // If the subscriber was not found then display a diagnostic - // - if(_traceLevels->topic > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); - out << id << ": not subscribed."; - } + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + out << id << ": not subscribed."; } +} +// +// TODO: Optimize +// +// It's not strictly necessary to clear the error'd subscribers on +// every publish iteration (if the subscriber validates the state +// before attempting publishing the event). This means more mutex +// locks (due to the state check in the subscriber) - but with the +// advantage that publishes can occur in parallel and less +// subscriber list iterations. +// +void +IceStorm::TopicSubscribers::publish(const Event& event) +{ // - // TODO: Optimize + // Copy of the subscriber list so that event publishing can + // occur in parallel. // - // It's not strictly necessary to clear the error'd subscribers on - // every publish iteration (if the subscriber validates the state - // before attempting publishing the event). This means more mutex - // locks (due to the state check in the subscriber) - but with the - // advantage that publishes can occur in parallel and less - // subscriber list iterations. + // TODO: Find out whether this is a false optimization - how + // expensive is the cost of copying vs. lack of parallelism? // - void - publish(const Event& event) + SubscriberList copy; + { + IceUtil::Mutex::Lock sync(_subscribersMutex); + + // + // Copy of the subscribers that are in error. // // Copy the subscriber list so that event publishing can // occur in parallel. // - // TODO: Find out whether this is a false optimization - how - // expensive is the cost of copying vs. lack of parallelism? + // Erase the inactive subscribers from the _subscribers + // list. Copy the subscribers in error to the error list. // - SubscriberList copy; - + SubscriberList::iterator p = _subscribers.begin(); + while(p != _subscribers.end()) { - IceUtil::Mutex::Lock sync(_subscribersMutex); - - // - // Copy of the subscribers that are in error. - // - SubscriberList e; - - // - // Erase the inactive subscribers from the _subscribers - // list. Copy the subscribers in error to the error list. - // - SubscriberList::iterator p = _subscribers.begin(); - while(p != _subscribers.end()) + if((*p)->inactive()) { - if((*p)->inactive()) - { - if((*p)->error()) - { - e.push_back(*p); - } - - SubscriberList::iterator tmp = p; - ++p; - _subscribers.erase(tmp); - } - else + if((*p)->error()) { - copy.push_back(*p); - ++p; + e.push_back(*p); } + + SubscriberList::iterator tmp = p; + ++p; + _subscribers.erase(tmp); } - - if(!e.empty()) + else { - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.splice(_error.begin(), e); + copy.push_back(*p); + ++p; } } - - for(SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p) + + if(!e.empty()) { - (*p)->publish(event); + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.splice(_error.begin(), e); } } - - // - // Clear & return the set of subscribers that are in error. - // - SubscriberList - clearErrorList() + + for(SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p) { - // - // Uses splice for efficiency - // - IceUtil::Mutex::Lock errorSync(_errorMutex); - SubscriberList c; - c.splice(c.begin(), _error); - return c; + (*p)->publish(event); } +} -private: - - TraceLevelsPtr _traceLevels; - - // - // TODO: Should there be a map from identity to subscriber? - // - IceUtil::Mutex _subscribersMutex; - SubscriberList _subscribers; - +// +// Clear & return the set of subscribers that are in error. +// +SubscriberList +IceStorm::TopicSubscribers::clearErrorList() +{ // - // Set of subscribers that have encountered an error. + // Uses splice for efficiency // - IceUtil::Mutex _errorMutex; - SubscriberList _error; -}; - -} // End namespace IceStorm + IceUtil::Mutex::Lock errorSync(_errorMutex); + SubscriberList c; + c.splice(c.begin(), _error); + return c; +} // // Incoming events from publishers. |