summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp252
1 files changed, 121 insertions, 131 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 9b883c48798..7545393fa70 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -19,7 +19,6 @@
#include <IceStorm/Subscriber.h>
#include <IceStorm/TraceLevels.h>
#include <algorithm>
-#include <list>
using namespace IceStorm;
using namespace std;
@@ -29,11 +28,6 @@ namespace IceStorm
{
//
-// A list of Subscribers.
-//
-typedef std::list<SubscriberPtr> SubscriberList;
-
-//
// The servant has a 1-1 association with a topic. It is used to
// receive events from Publishers.
//
@@ -92,25 +86,66 @@ private:
IceStorm::TopicSubscribersPtr _subscribers;
};
+} // End namespace IceStorm
-//
-// Holder for the set of subscribers.
-//
-class TopicSubscribers : public IceUtil::Shared
+IceStorm::TopicSubscribers::TopicSubscribers(const TraceLevelsPtr& traceLevels) :
+ _traceLevels(traceLevels)
{
-public:
+}
- TopicSubscribers(const TraceLevelsPtr& traceLevels) :
- _traceLevels(traceLevels)
- {
- }
+IceStorm::TopicSubscribers::~TopicSubscribers()
+{
+}
- ~TopicSubscribers()
+void
+IceStorm::TopicSubscribers::add(const SubscriberPtr& subscriber)
+{
+ Ice::Identity id = subscriber->id();
+
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ //
+ // If a subscriber with this identity is already subscribed
+ // then mark the subscriber as replaced.
+ //
+ // Note that this doesn't actually remove the Subscribe from
+ // the list of subscribers - it marks the Subscriber as
+ // replaced, and it's removed on the next event publish.
+ //
+ for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
{
+ if((*i)->id() == id)
+ {
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->replace();
+ break;
+ }
}
+
+
+ //
+ // Add to the set of subscribers
+ //
+ _subscribers.push_back(subscriber);
+}
- void
- add(const SubscriberPtr& subscriber)
+//
+// Unsubscribe the Subscriber with the given identity. Note that
+// this doesn't remove the Subscriber from the list of subscribers
+// - it marks the Subscriber as unsubscribed, and it's removed on
+// the next event publish.
+//
+void
+IceStorm::TopicSubscribers::remove(const Ice::ObjectPrx& obj)
+{
+ Ice::Identity id = obj->ice_getIdentity();
+
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
{
Ice::Identity id = subscriber->id();
@@ -126,15 +161,12 @@ public:
//
for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
{
- if((*i)->id() == id)
- {
- //
- // This marks the subscriber as invalid. It will be
- // removed on the next event publish.
- //
- (*i)->replace();
- break;
- }
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->unsubscribe();
+ return;
}
//
@@ -142,144 +174,102 @@ public:
//
_subscribers.push_back(subscriber);
}
-
+
//
// Unsubscribe the subscriber with the given identity. Note that
// this doesn't remove the subscriber from the list of subscribers
// - it marks the subscriber as unsubscribed, and it's removed on
// the next event publish.
//
- void
- remove(const Ice::ObjectPrx& obj)
+ if(_traceLevels->topic > 0)
{
- Ice::Identity id = obj->ice_getIdentity();
-
- IceUtil::Mutex::Lock sync(_subscribersMutex);
-
- for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
- {
- if((*i)->id() == id)
- {
- //
- // This marks the subscriber as invalid. It will be
- // removed on the next event publish.
- //
- (*i)->unsubscribe();
- return;
- }
- }
-
- //
- // If the subscriber was not found then display a diagnostic
- //
- if(_traceLevels->topic > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
- out << id << ": not subscribed.";
- }
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
+ out << id << ": not subscribed.";
}
+}
+//
+// TODO: Optimize
+//
+// It's not strictly necessary to clear the error'd subscribers on
+// every publish iteration (if the subscriber validates the state
+// before attempting publishing the event). This means more mutex
+// locks (due to the state check in the subscriber) - but with the
+// advantage that publishes can occur in parallel and less
+// subscriber list iterations.
+//
+void
+IceStorm::TopicSubscribers::publish(const Event& event)
+{
//
- // TODO: Optimize
+ // Copy of the subscriber list so that event publishing can
+ // occur in parallel.
//
- // It's not strictly necessary to clear the error'd subscribers on
- // every publish iteration (if the subscriber validates the state
- // before attempting publishing the event). This means more mutex
- // locks (due to the state check in the subscriber) - but with the
- // advantage that publishes can occur in parallel and less
- // subscriber list iterations.
+ // TODO: Find out whether this is a false optimization - how
+ // expensive is the cost of copying vs. lack of parallelism?
//
- void
- publish(const Event& event)
+ SubscriberList copy;
+
{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ //
+ // Copy of the subscribers that are in error.
//
// Copy the subscriber list so that event publishing can
// occur in parallel.
//
- // TODO: Find out whether this is a false optimization - how
- // expensive is the cost of copying vs. lack of parallelism?
+ // Erase the inactive subscribers from the _subscribers
+ // list. Copy the subscribers in error to the error list.
//
- SubscriberList copy;
-
+ SubscriberList::iterator p = _subscribers.begin();
+ while(p != _subscribers.end())
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
-
- //
- // Copy of the subscribers that are in error.
- //
- SubscriberList e;
-
- //
- // Erase the inactive subscribers from the _subscribers
- // list. Copy the subscribers in error to the error list.
- //
- SubscriberList::iterator p = _subscribers.begin();
- while(p != _subscribers.end())
+ if((*p)->inactive())
{
- if((*p)->inactive())
- {
- if((*p)->error())
- {
- e.push_back(*p);
- }
-
- SubscriberList::iterator tmp = p;
- ++p;
- _subscribers.erase(tmp);
- }
- else
+ if((*p)->error())
{
- copy.push_back(*p);
- ++p;
+ e.push_back(*p);
}
+
+ SubscriberList::iterator tmp = p;
+ ++p;
+ _subscribers.erase(tmp);
}
-
- if(!e.empty())
+ else
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.splice(_error.begin(), e);
+ copy.push_back(*p);
+ ++p;
}
}
-
- for(SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p)
+
+ if(!e.empty())
{
- (*p)->publish(event);
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.splice(_error.begin(), e);
}
}
-
- //
- // Clear & return the set of subscribers that are in error.
- //
- SubscriberList
- clearErrorList()
+
+ for(SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p)
{
- //
- // Uses splice for efficiency
- //
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- SubscriberList c;
- c.splice(c.begin(), _error);
- return c;
+ (*p)->publish(event);
}
+}
-private:
-
- TraceLevelsPtr _traceLevels;
-
- //
- // TODO: Should there be a map from identity to subscriber?
- //
- IceUtil::Mutex _subscribersMutex;
- SubscriberList _subscribers;
-
+//
+// Clear & return the set of subscribers that are in error.
+//
+SubscriberList
+IceStorm::TopicSubscribers::clearErrorList()
+{
//
- // Set of subscribers that have encountered an error.
+ // Uses splice for efficiency
//
- IceUtil::Mutex _errorMutex;
- SubscriberList _error;
-};
-
-} // End namespace IceStorm
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ SubscriberList c;
+ c.splice(c.begin(), _error);
+ return c;
+}
//
// Incoming events from publishers.