summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2002-02-28 17:30:43 +0000
committerMatthew Newhook <matthew@zeroc.com>2002-02-28 17:30:43 +0000
commita0262246c57db3d224f2c6bc60b93b0af9ba091e (patch)
treee19a2b3ea3bddcd94c03e4fc5012fb4ad6a10328 /cpp
parentfixes (diff)
downloadice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.bz2
ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.xz
ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.zip
IceStorm changes
Diffstat (limited to 'cpp')
-rw-r--r--cpp/demo/IceStorm/clock/ClockI.cpp1
-rw-r--r--cpp/demo/IceStorm/clock/Publisher.cpp25
-rw-r--r--cpp/demo/IceStorm/clock/Subscriber.cpp126
-rw-r--r--cpp/slice/IceStorm/IceStorm.ice29
-rw-r--r--cpp/src/IceStorm/Flusher.cpp6
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.cpp13
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.h1
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.cpp19
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.h3
-rw-r--r--cpp/src/IceStorm/OnewaySubscriber.cpp13
-rw-r--r--cpp/src/IceStorm/OnewaySubscriber.h1
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp6
-rw-r--r--cpp/src/IceStorm/Subscriber.h67
-rw-r--r--cpp/src/IceStorm/TopicI.cpp179
-rw-r--r--cpp/src/IceStorm/TopicI.h4
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp105
-rw-r--r--cpp/src/IceStorm/TopicManagerI.h5
-rw-r--r--cpp/test/IceStorm/federation/Subscriber.cpp19
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp10
19 files changed, 341 insertions, 291 deletions
diff --git a/cpp/demo/IceStorm/clock/ClockI.cpp b/cpp/demo/IceStorm/clock/ClockI.cpp
index 2872ddd4954..769f1503e5e 100644
--- a/cpp/demo/IceStorm/clock/ClockI.cpp
+++ b/cpp/demo/IceStorm/clock/ClockI.cpp
@@ -9,6 +9,7 @@
// **********************************************************************
#include <Ice/Ice.h>
+
#include <ClockI.h>
using namespace std;
diff --git a/cpp/demo/IceStorm/clock/Publisher.cpp b/cpp/demo/IceStorm/clock/Publisher.cpp
index 426d0c78f77..a932c8aca87 100644
--- a/cpp/demo/IceStorm/clock/Publisher.cpp
+++ b/cpp/demo/IceStorm/clock/Publisher.cpp
@@ -10,12 +10,12 @@
#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
+
#include <Clock.h>
using namespace std;
-using namespace Ice;
-class Publisher : public Application
+class Publisher : public Ice::Application
{
public:
@@ -32,16 +32,17 @@ main(int argc, char* argv[])
int
Publisher::run(int argc, char* argv[])
{
- PropertiesPtr properties = communicator()->getProperties();
- const char* endpointsProperty = "IceStorm.TopicManager.Endpoints";
- std::string endpoints = properties->getProperty(endpointsProperty);
+ Ice::PropertiesPtr properties = communicator()->getProperties();
+
+ static const string endpointsProperty = "IceStorm.TopicManager.Endpoints";
+ string endpoints = properties->getProperty(endpointsProperty);
if (endpoints.empty())
{
cerr << appName() << ": property `" << endpointsProperty << "' not set" << endl;
return EXIT_FAILURE;
}
- ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints);
+ Ice::ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints);
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base);
if (!manager)
{
@@ -49,6 +50,9 @@ Publisher::run(int argc, char* argv[])
return EXIT_FAILURE;
}
+ //
+ // Retrieve the topic named "time".
+ //
IceStorm::TopicPrx topic;
try
{
@@ -56,16 +60,17 @@ Publisher::run(int argc, char* argv[])
}
catch(const IceStorm::NoSuchTopic& e)
{
- cerr << appName() << ": NoSuchTopic: " << e.name << endl;
+ cerr << appName() << ": " << e << " name: " << e.name << endl;
return EXIT_FAILURE;
}
assert(topic);
//
- // Get a publisher object, create a oneway proxy and then cast to
- // a Clock object
+ // Get a publisher object, create a oneway proxy (for efficiency
+ // reasons) and then cast to a Clock object. Note that the cast
+ // must be unchecked.
//
- ObjectPrx obj = topic->getPublisher();
+ Ice::ObjectPrx obj = topic->getPublisher();
obj = obj->ice_oneway();
ClockPrx clock = ClockPrx::uncheckedCast(obj);
diff --git a/cpp/demo/IceStorm/clock/Subscriber.cpp b/cpp/demo/IceStorm/clock/Subscriber.cpp
index 33a0065c936..78d47f0c73c 100644
--- a/cpp/demo/IceStorm/clock/Subscriber.cpp
+++ b/cpp/demo/IceStorm/clock/Subscriber.cpp
@@ -10,12 +10,15 @@
#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
+#include <IceUtil/UUID.h>
+
#include <ClockI.h>
+#include <list>
+
using namespace std;
-using namespace Ice;
-class Subscriber : public Application
+class Subscriber : public Ice::Application
{
public:
@@ -32,16 +35,17 @@ main(int argc, char* argv[])
int
Subscriber::run(int argc, char* argv[])
{
- PropertiesPtr properties = communicator()->getProperties();
- const char* endpointsProperty = "IceStorm.TopicManager.Endpoints";
- std::string endpoints = properties->getProperty(endpointsProperty);
+ Ice::PropertiesPtr properties = communicator()->getProperties();
+
+ static const string endpointsProperty = "IceStorm.TopicManager.Endpoints";
+ string endpoints = properties->getProperty(endpointsProperty);
if (endpoints.empty())
{
cerr << appName() << ": property `" << endpointsProperty << "' not set" << endl;
return EXIT_FAILURE;
}
- ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints);
+ Ice::ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints);
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base);
if (!manager)
{
@@ -50,10 +54,10 @@ Subscriber::run(int argc, char* argv[])
}
//
- // Gather the set of topics to subscribe to. It is either the set
- // provided on the command line, or the topic "time".
+ // Gather the set of topics to which to subscribe. It is either
+ // the set provided on the command line, or the topic "time".
//
- IceStorm::StringSeq topics;
+ Ice::StringSeq topics;
if (argc > 1)
{
for (int i = 1; i < argc; ++i)
@@ -63,52 +67,96 @@ Subscriber::run(int argc, char* argv[])
}
else
{
- //
- // The set of topics to which to subscribe
- //
topics.push_back("time");
}
//
- // Create the servant to receive the events. Then add the servant
- // to the adapter for the given topics. Alternatively we could
- // have used a ServantLocator for the same purpose. Note that any
- // of the activated proxies will do since it the proxy is only a
- // template from which the actual proxy is created by IceStorm.
+ // Set the requested quality of service "reliability" =
+ // "batch". This tells IceStorm to send events to the subscriber
+ // in batches at regular intervals.
+ //
+ IceStorm::QoS qos;
+ qos["reliability"] = "batch";
+
+ //
+ // Create the servant to receive the events.
//
- ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp");
- ObjectPtr clock = new ClockI();
- ObjectPrx object;
+ Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp");
+ Ice::ObjectPtr clock = new ClockI();
- assert(!topics.empty());
- Ice::Identity ident;
- ident.category = "events";
+ //
+ // Create a UUID to use in the name field of the the object
+ // identity. The same UUID can be used for the identity of each
+ // Subscriber object since the category field will differ.
+ //
+ string uuid = IceUtil::generateUUID();
+
+ //
+ // List of all subscribers.
+ //
+ list<Ice::ObjectPrx> subscribers;
- for (IceStorm::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p)
+ //
+ // Add the servant to the adapter for the given
+ // topics. Alternatively a ServantLocator could have been used for
+ // the same purpose.
+ //
+ for (Ice::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p)
{
- ident.name = *p;
- object = adapter->add(clock, ident);
+ //
+ // The category is the name of the topic.
+ //
+ Ice::Identity ident;
+ ident.category = *p;
+ ident.name = uuid;
+
+ //
+ // Add a Servant for the Ice Object.
+ //
+ Ice::ObjectPrx object = adapter->add(clock, ident);
+ try
+ {
+ manager->subscribe(qos, object);
+ }
+ catch(const IceStorm::NoSuchTopic& e)
+ {
+ cerr << appName() << ": " << e << " name: " << e.name << endl;
+ break;
+ }
+
+ //
+ // Add to the set of subscribers _after_ subscribing. This
+ // ensures that only subscribed subscribers are unsubscribed
+ // in the case of an error.
+ //
+ subscribers.push_back(object);
}
//
- // The requested quality of service. This requests "reliability" =
- // "batch". This asks IceStorm to send events to the subscriber in
- // batches at regular intervals.
+ // Unless there is a subscriber per topic then there was some
+ // problem. If there was an error the application should terminate
+ // without accepting any events.
//
- IceStorm::QoS qos;
- qos["reliability"] = "batch";
- try
+ if (subscribers.size() == topics.size())
{
- manager->subscribe("events", qos, topics, object);
+ adapter->activate();
+ communicator()->waitForShutdown();
}
- catch(const IceStorm::NoSuchTopic& e)
+
+ //
+ // Unsubscribe all subscribed objects.
+ //
+ for (list<Ice::ObjectPrx>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
{
- cerr << appName() << ": NoSuchTopic: " << e.name << endl;
- return EXIT_FAILURE;
+ try
+ {
+ manager->unsubscribe(*q);
+ }
+ catch(const IceStorm::NoSuchTopic& e)
+ {
+ cerr << appName() << ": " << e << " name: " << e.name << endl;
+ }
}
- adapter->activate();
-
- communicator()->waitForShutdown();
return EXIT_SUCCESS;
}
diff --git a/cpp/slice/IceStorm/IceStorm.ice b/cpp/slice/IceStorm/IceStorm.ice
index e83e4760598..dd1a0bde238 100644
--- a/cpp/slice/IceStorm/IceStorm.ice
+++ b/cpp/slice/IceStorm/IceStorm.ice
@@ -173,14 +173,6 @@ exception NoSuchTopic
/**
*
- * A sequence of strings. TODO: This should be moved to the Ice core
- * (Ice/Types.ice).
- *
- **/
-sequence<string> StringSeq;
-
-/**
- *
* This dictionary represents Quality of service parameters.
*
* @see TopicManager::subscribe
@@ -234,30 +226,25 @@ interface TopicManager
/**
*
- * Subscribe the given subscriber to a set of topics.
- *
- * @param id The identity of the subscriber. Each unique
- * subscriber must use a unique identity string. To receive events
- * the subscriber must register a servant with the identity
- * category=id name=topic.
+ * Subscribe with the given [qos] to the topic identified in the
+ * [subscribers] identity category. If a subscriber already exists
+ * with the given identity, then it will be replaced by this
+ * subscriber.
*
* @param qos The quality of service parameters for this
* subscription. The only currently supported QoS is
* "reliability", which can be either "oneway" or "batch".
*
- * @param topics The topics to which to subscribe.
- *
- * @param tmpl An object template which is used to send events to
- * the subscriber.
+ * @param tmpl The proxy to which to send events.
*
* @see unsubscribe
*
**/
- void subscribe(string id, QoS qos, StringSeq topics, Object* tmpl) throws NoSuchTopic;
+ void subscribe(QoS qos, Object* subscriber) throws NoSuchTopic;
/**
*
- * Unsubscribe the given subscriber from a set of topics.
+ * Unsubscribe the given [subscriber].
*
* @param id The identity of the given subscriber.
*
@@ -266,7 +253,7 @@ interface TopicManager
* @see subscribe
*
**/
- void unsubscribe(string id, StringSeq topics);
+ void unsubscribe(Object* subscriber);
/**
*
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp
index 913612843c2..70558d76eac 100644
--- a/cpp/src/IceStorm/Flusher.cpp
+++ b/cpp/src/IceStorm/Flusher.cpp
@@ -120,12 +120,6 @@ private:
//IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
- // Using standard algorithms I don't think there is a way to
- // do this in one pass. For instance, I thought about using
- // remove_if - but the predicate needs to be a pure function
- // (see Meyers for details). If this is fixed then fix TopicI
- // also.
- //
// remove_if doesn't work with handle types. remove_if also
// isn't present in the STLport implementation
//
diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp
index b04cfd8f9cc..b13ee7ec91e 100644
--- a/cpp/src/IceStorm/LinkSubscriber.cpp
+++ b/cpp/src/IceStorm/LinkSubscriber.cpp
@@ -54,6 +54,19 @@ LinkSubscriber::unsubscribe()
}
void
+LinkSubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+}
+
+void
LinkSubscriber::publish(const Event& event)
{
//
diff --git a/cpp/src/IceStorm/LinkSubscriber.h b/cpp/src/IceStorm/LinkSubscriber.h
index 41722cacfc1..00f62683f8f 100644
--- a/cpp/src/IceStorm/LinkSubscriber.h
+++ b/cpp/src/IceStorm/LinkSubscriber.h
@@ -29,6 +29,7 @@ public:
virtual bool persistent() const;
virtual bool inactive() const;
virtual void unsubscribe();
+ virtual void replace();
virtual void publish(const Event&);
virtual void flush();
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
index 4a9915af023..e42a9728ef1 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
@@ -48,6 +48,25 @@ OnewayBatchSubscriber::unsubscribe()
_flusher->remove(this);
}
+void
+OnewayBatchSubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+
+ //
+ // If this subscriber has been registered with the flusher then
+ // remove it.
+ //
+ _flusher->remove(this);
+}
+
bool
OnewayBatchSubscriber::inactive() const
{
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.h b/cpp/src/IceStorm/OnewayBatchSubscriber.h
index 3aaea29e47b..4b90d33d1a6 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.h
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.h
@@ -27,10 +27,11 @@ class OnewayBatchSubscriber : public OnewaySubscriber, public Flushable
{
public:
- OnewayBatchSubscriber(const TraceLevelsPtr& traceLevels, const FlusherPtr&, const Ice::ObjectPrx&);
+ OnewayBatchSubscriber(const TraceLevelsPtr&, const FlusherPtr&, const Ice::ObjectPrx&);
~OnewayBatchSubscriber();
virtual void unsubscribe();
+ virtual void replace();
virtual bool inactive() const;
virtual void flush();
diff --git a/cpp/src/IceStorm/OnewaySubscriber.cpp b/cpp/src/IceStorm/OnewaySubscriber.cpp
index b1007792355..5c547ad4f8f 100644
--- a/cpp/src/IceStorm/OnewaySubscriber.cpp
+++ b/cpp/src/IceStorm/OnewaySubscriber.cpp
@@ -46,6 +46,19 @@ OnewaySubscriber::unsubscribe()
}
void
+OnewaySubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+}
+
+void
OnewaySubscriber::publish(const Event& event)
{
try
diff --git a/cpp/src/IceStorm/OnewaySubscriber.h b/cpp/src/IceStorm/OnewaySubscriber.h
index 640532b1cab..032821f271a 100644
--- a/cpp/src/IceStorm/OnewaySubscriber.h
+++ b/cpp/src/IceStorm/OnewaySubscriber.h
@@ -25,6 +25,7 @@ public:
virtual bool persistent() const;
virtual void unsubscribe();
+ virtual void replace();
virtual void publish(const Event&);
protected:
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index ac9226f1f85..62bb7eaab8d 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -32,11 +32,11 @@ Subscriber::inactive() const
return _state != StateActive;;
}
-Subscriber::State
-Subscriber::state() const
+bool
+Subscriber::error() const
{
IceUtil::Mutex::Lock sync(_stateMutex);
- return _state;
+ return _state == StateError;
}
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
index b2db80e3695..453c8b125b3 100644
--- a/cpp/src/IceStorm/Subscriber.h
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -56,35 +56,15 @@ public:
virtual bool persistent() const = 0;
//
- // Subscriber state.
- //
- enum State
- {
- //
- // The Subscriber is active
- //
- StateActive,
- //
- // The Subscriber encountered an error during event
- // transmission
- //
- StateError,
- //
- // The Subscriber has been unsubscribed
- //
- StateUnsubscribed
- };
-
- //
- // This method is for ease of use with STL algorithms.
- // Return _state != StateActive
+ // Return true if the Subscriber is not active, false otherwise.
//
bool inactive() const;
//
- // Retrieve the state of the Subscriber.
+ // Retrieve true if the Subscriber is in the error state, false
+ // otherwise.
//
- State state() const;
+ bool error() const;
//
// Retrieve the identity of the Subscriber.
@@ -97,8 +77,13 @@ public:
virtual void unsubscribe() = 0;
//
- // Publish the given event. Mark the state as Error in the event
- // of a problem.
+ // Unsubscribe. Mark the state as Replaced.
+ //
+ virtual void replace() = 0;
+
+ //
+ // Publish the given event. Mark the state as Error in the event of
+ // a problem.
//
virtual void publish(const Event&) = 0;
@@ -107,16 +92,40 @@ protected:
// Immutable
TraceLevelsPtr _traceLevels;
+ //
+ // Subscriber state.
+ //
+ enum State
+ {
+ //
+ // The Subscriber is active.
+ //
+ StateActive,
+ //
+ // The Subscriber encountered an error during event
+ // transmission.
+ //
+ StateError,
+ //
+ // The Subscriber has been unsubscribed.
+ //
+ StateUnsubscribed,
+ //
+ // The Subscriber has been replaced.
+ //
+ StateReplaced
+ };
+
IceUtil::Mutex _stateMutex;
State _state;
private:
//
- // This id is the full id of the subscriber for a particular topic
- // (that is <prefix>#<topicname>
+ // This id is the full id of the subscriber for a particular topic.
+ //
+ // Immutable.
//
- // Immutable
Ice::Identity _id;
};
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 1e35b5d9308..f4d93751563 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -105,7 +105,31 @@ public:
void
add(const SubscriberPtr& subscriber)
{
+ Ice::Identity id = subscriber->id();
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ //
+ // If a subscriber with this identity is already subscribed
+ // then mark the subscriber as replaced.
+ //
+ // Note that this doesn't actually remove the Subscribe from
+ // the list of subscribers - it marks the Subscriber as
+ // replaced, and it's removed on the next event publish.
+ //
+ for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ {
+ if ((*i)->id() == id)
+ {
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->replace();
+ break;
+ }
+ }
+
//
// Add to the set of subscribers
@@ -116,16 +140,17 @@ public:
//
// Unsubscribe the Subscriber with the given identity. Note that
// this doesn't remove the Subscriber from the list of subscribers
- // - it marks the Subscriber as Unsubscribed, and it's removed on
+ // - it marks the Subscriber as unsubscribed, and it's removed on
// the next event publish.
//
void
- unsubscribe(const Ice::Identity& id)
+ remove(const Ice::ObjectPrx& obj)
{
+ Ice::Identity id = obj->ice_getIdentity();
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
- SubscriberList::iterator i;
- for (i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
{
if ((*i)->id() == id)
{
@@ -134,20 +159,17 @@ public:
// removed on the next event publish.
//
(*i)->unsubscribe();
- break;
+ return;
}
}
//
// If the subscriber was not found then display a diagnostic
//
- if (i == _subscribers.end())
+ if (_traceLevels->topic > 0)
{
- if (_traceLevels->topic > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
- out << id << ": not subscribed.";
- }
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
+ out << id << ": not subscribed.";
}
}
@@ -161,55 +183,61 @@ public:
// advantage that publishes can occur in parallel and less
// subscriber list iterations.
//
-
void
publish(const Event& event)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
-
//
- // Using standard algorithms I don't think there is a way to
- // do this in one pass. For instance, I thought about using
- // remove_if - but the predicate needs to be a pure function
- // (see Meyers for details). If this is fixed then fix Flusher
- // also.
+ // Copy of the subscriber list so that event publishing can
+ // occur in parallel.
//
- // remove_if doesn't work with handle types. remove_if also
- // isn't present in the STLport implementation
+ // TODO: Find out whether this is a false optimization - how
+ // expensive is the cost of copying vs. lack of parallelism?
//
- // _subscribers.remove_if(IceUtil::constMemFun(&Subscriber::inactive));
- //
+ SubscriberList copy;
- //
- // Erase the inactive subscribers from the _subscribers
- // list. Copy the subscribers in error to the error list.
- //
- SubscriberList::iterator p = remove_if(_subscribers.begin(), _subscribers.end(),
- IceUtil::constMemFun(&Subscriber::inactive));
-
- if (p != _subscribers.end())
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
//
- // Copy each of the invalid subscribers that was not
- // unsubscribed. Note that there is no copy_if algorithm.
+ // Copy of the subscribers that are in error.
//
- // Note that this could also be written in terms of splice
- // & remove_if.
+ SubscriberList e;
+
+ //
+ // Erase the inactive subscribers from the _subscribers
+ // list. Copy the subscribers in error to the error list.
//
+ SubscriberList::iterator p = _subscribers.begin();
while (p != _subscribers.end())
{
- if ((*p)->state() == Subscriber::StateError)
+ if ((*p)->inactive())
+ {
+ if ((*p)->error())
+ {
+ e.push_back(*p);
+ }
+
+ SubscriberList::iterator tmp = p;
+ ++p;
+ _subscribers.erase(tmp);
+ }
+ else
{
- _error.push_front(*p);
+ copy.push_back(*p);
+ ++p;
}
- _subscribers.erase(p++);
+ }
+
+ if (!e.empty())
+ {
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.splice(_error.begin(), e);
}
}
- for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i)
+ for (SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p)
{
- (*i)->publish(event);
+ (*p)->publish(event);
}
}
@@ -217,7 +245,7 @@ public:
// Clear & return the set of subscribers that are in error.
//
SubscriberList
- clearErrorList() const
+ clearErrorList()
{
//
// Uses splice for efficiency
@@ -229,17 +257,20 @@ public:
}
private:
-
+
TraceLevelsPtr _traceLevels;
+ //
+ // TODO: Should there be a map from identity to subscriber?
+ //
IceUtil::Mutex _subscribersMutex;
SubscriberList _subscribers;
//
- // Set of subscribers that encountered an error.
+ // Set of subscribers that have encountered an error.
//
IceUtil::Mutex _errorMutex;
- mutable SubscriberList _error;
+ SubscriberList _error;
};
} // End namespace IceStorm
@@ -319,7 +350,7 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
//
// Create a servant per Topic to receive linked event data. The
// servants Identity is category=<topicname>, name=link. Activate
- // the object and save a reference to give to linked topics..
+ // the object and save a reference to give to linked topics.
//
_link = new TopicLinkI(_subscribers);
@@ -327,16 +358,20 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
_linkPrx = TopicLinkPrx::uncheckedCast(_adapter->add(_link, id));
//
- // Run through link database - re-establishing linked subscribers
+ // Run through link database re-establishing linked subscribers.
//
for (IdentityLinkDict::const_iterator p = _links.begin(); p != _links.end(); ++p)
{
if (_traceLevels->topic > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
- out << _name << " relink " << p->second.obj->ice_getIdentity();
+ out << _name << " relink " << p->first;
}
+ //
+ // Create the Subscriber object and add to the set of
+ // Subscribers.
+ //
SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost);
_subscribers->add(subscriber);
}
@@ -411,12 +446,15 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
}
//
- // Retrieve the TopicLink
+ // Retrieve the TopicLink.
//
TopicInternalPrx internal = TopicInternalPrx::checkedCast(topic);
TopicLinkPrx link = internal->getLinkProxy();
Ice::Identity ident = link->ice_getIdentity();
+ //
+ // Create the LinkDB record.
+ //
LinkDB dbInfo;
dbInfo.obj = link;
dbInfo.info.topic = topic;
@@ -424,20 +462,9 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
dbInfo.info.cost = cost;
//
- // If the link already exists then remove the original subscriber.
- //
- // Note: If events arrive before the new subscriber is added then
- // they will be lost. An alternative to this strategy would be to
- // update the subscriber information.
+ // Create the Subscriber object and add to the setup of
+ // subscribers.
//
- IdentityLinkDict::iterator p = _links.find(ident);
- if (p != _links.end())
- {
- _subscribers->unsubscribe(ident);
- }
-
- _links.insert(make_pair(ident, dbInfo));
-
SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost);
_subscribers->add(subscriber);
}
@@ -464,7 +491,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current&)
Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
out << _name << " unlink " << topic->getName();
}
- _subscribers->unsubscribe(link->ice_getIdentity());
+ _subscribers->remove(link);
}
else
{
@@ -514,7 +541,7 @@ TopicI::destroyed() const
}
void
-TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
+TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -526,23 +553,14 @@ TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
reap();
//
- // Create the identity category=id, name=<topicname>.
- //
- Ice::Identity ident;
- ident.category = id;
- ident.name = _name;
- Ice::ObjectPrx obj = tmpl->ice_newIdentity(ident);
-
- //
- // Create the subscriber with this id and add to the set of
- // subscribers.
+ // Add this subscriber to the set of subscribers.
//
SubscriberPtr subscriber = _factory->createSubscriber(qos, obj);
_subscribers->add(subscriber);
}
void
-TopicI::unsubscribe(const string& id)
+TopicI::unsubscribe(const Ice::ObjectPrx& obj)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -554,16 +572,9 @@ TopicI::unsubscribe(const string& id)
reap();
//
- // Create the identity category=id, name=<topicname>.
- //
- Ice::Identity ident;
- ident.category = id;
- ident.name = _name;
-
- //
// Unsubscribe the subscriber with this identity.
//
- _subscribers->unsubscribe(ident);
+ _subscribers->remove(obj);
}
void
@@ -584,7 +595,7 @@ TopicI::reap()
for (SubscriberList::iterator p = error.begin(); p != error.end(); ++p)
{
SubscriberPtr subscriber = *p;
- assert(subscriber->state() == Subscriber::StateError);
+ assert(subscriber->error());
if (subscriber->persistent())
{
if (_links.erase(subscriber->id()) > 0)
diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h
index f920310bc7c..26e670203ea 100644
--- a/cpp/src/IceStorm/TopicI.h
+++ b/cpp/src/IceStorm/TopicI.h
@@ -55,8 +55,8 @@ public:
// Internal methods
bool destroyed() const;
- void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&);
- void unsubscribe(const std::string&);
+ void subscribe(const Ice::ObjectPrx&, const QoS&);
+ void unsubscribe(const Ice::ObjectPrx&);
void reap();
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 9cf3616b867..8bac763e306 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -51,9 +51,8 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << ex;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << ex;
}
StringBoolDict::iterator tmp = p;
++p;
@@ -137,104 +136,66 @@ TopicManagerI::retrieveAll(const Ice::Current&)
}
void
-TopicManagerI::subscribe(const string& id, const QoS& qos, const StringSeq& topics, const Ice::ObjectPrx& tmpl,
- const Ice::Current&)
+TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&)
{
IceUtil::Mutex::Lock sync(*this);
+ Ice::Identity ident = subscriber->ice_getIdentity();
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Subscribe: " << id;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << "Subscribe: " << Ice::identityToString(ident);
if (_traceLevels->topicMgr > 1)
{
- s << " QoS: ";
+ out << " QoS: ";
for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
{
if (qi != qos.begin())
{
- s << ',';
+ out << ',';
}
- s << '[' << qi->first << "," << qi->second << ']';
- }
- s << " Topics: ";
- for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
- {
- if (ti != topics.begin())
- {
- s << ",";
- }
- s << *ti;
+ out << '[' << qi->first << "," << qi->second << ']';
}
}
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
}
//
- // First scan the set of topics to ensure that each exists.
+ // Ensure that the identities category refers to an existing
+ // channel.
//
- // TODO: This could be slightly optimized by remembering the
- // TopicIPtr's so that the list doesn't need to scanned again.
- //
- StringSeq::const_iterator i;
- for (i = topics.begin() ; i != topics.end() ; ++i)
+ TopicIMap::iterator elem = _topicIMap.find(ident.category);
+ if (elem == _topicIMap.end())
{
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem == _topicIMap.end())
- {
- NoSuchTopic ex;
- ex.name = *i;
- throw ex;
- }
+ NoSuchTopic ex;
+ ex.name = ident.category;
+ throw ex;
}
//
- // Subscribe to each Topic.
+ // Subscribe to the topic.
//
- for (i = topics.begin() ; i != topics.end() ; ++i)
- {
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem != _topicIMap.end())
- {
- elem->second->subscribe(tmpl, id, qos);
- }
- }
+ elem->second->subscribe(subscriber, qos);
}
void
-TopicManagerI::unsubscribe(const string& id, const StringSeq& topics, const Ice::Current&)
+TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
{
IceUtil::Mutex::Lock sync(*this);
+ Ice::Identity ident = subscriber->ice_getIdentity();
+
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Unsubscribe: " << id;
- if (_traceLevels->topicMgr > 1)
- {
- s << " Topics: ";
- for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
- {
- if (ti != topics.begin())
- {
- s << ",";
- }
- s << *ti;
- }
- }
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+
+ out << "Unsubscribe: " << Ice::identityToString(ident);
}
- //
- // Unsubscribe to each Topic.
- //
- for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+
+ TopicIMap::iterator elem = _topicIMap.find(ident.category);
+ if (elem != _topicIMap.end())
{
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem != _topicIMap.end())
- {
- elem->second->unsubscribe(id);
- }
+ elem->second->unsubscribe(subscriber);
}
}
@@ -260,9 +221,8 @@ TopicManagerI::reap()
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Reaping " << i->first;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << "Reaping " << i->first;
}
_topics.erase(i->first);
_topicIMap.erase(i++);
@@ -279,9 +239,8 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name,
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << message << ' ' << name;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << message << ' ' << name;
}
// TODO: instance
diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h
index 4df698d4743..0aa5b91f784 100644
--- a/cpp/src/IceStorm/TopicManagerI.h
+++ b/cpp/src/IceStorm/TopicManagerI.h
@@ -51,9 +51,8 @@ public:
virtual TopicPrx create(const std::string&, const Ice::Current&);
virtual TopicPrx retrieve(const std::string&, const Ice::Current&);
virtual TopicDict retrieveAll(const Ice::Current&);
- virtual void subscribe(const std::string&, const QoS&, const StringSeq&, const Ice::ObjectPrx&,
- const Ice::Current&);
- virtual void unsubscribe(const std::string&, const StringSeq&, const Ice::Current&);
+ virtual void subscribe(const QoS&, const Ice::ObjectPrx&, const Ice::Current&);
+ virtual void unsubscribe(const Ice::ObjectPrx&, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
void reap();
diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp
index 051ced9605c..6f4cc277b78 100644
--- a/cpp/test/IceStorm/federation/Subscriber.cpp
+++ b/cpp/test/IceStorm/federation/Subscriber.cpp
@@ -101,26 +101,21 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
EventIPtr eventFed1 = new EventI(communicator);
EventIPtr eventFed2 = new EventI(communicator);
EventIPtr eventFed3 = new EventI(communicator);
- //
- // Any of the objects will do as long as they are all activated
- //
- ObjectPrx object = adapter->add(eventFed1, stringToIdentity("events#fed1"));
- adapter->add(eventFed2, stringToIdentity("events#fed2"));
- adapter->add(eventFed3, stringToIdentity("events#fed3"));
//
- // The set of topics to which to subscribe
+ // Activate the servants.
//
- IceStorm::StringSeq topics;
- topics.push_back("fed1");
- topics.push_back("fed2");
- topics.push_back("fed3");
+ ObjectPrx objFed1 = adapter->add(eventFed1, stringToIdentity("fed1#events"));
+ ObjectPrx objFed2 = adapter->add(eventFed2, stringToIdentity("fed2#events"));
+ ObjectPrx objFed3 = adapter->add(eventFed3, stringToIdentity("fed3#events"));
IceStorm::QoS qos;
//TODO: qos["reliability"] = "batch";
try
{
- manager->subscribe("events", qos, topics, object);
+ manager->subscribe(qos, objFed1);
+ manager->subscribe(qos, objFed2);
+ manager->subscribe(qos, objFed3);
}
catch(const IceStorm::NoSuchTopic& e)
{
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index aae8fc240fe..fc91cb09767 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -92,19 +92,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default");
ObjectPtr single = new SingleI(communicator);
- ObjectPrx object = adapter->add(single, stringToIdentity("events#single"));
-
- //
- // The set of topics to which to subscribe
- //
- IceStorm::StringSeq topics;
- topics.push_back("single");
+ ObjectPrx object = adapter->add(single, stringToIdentity("single#events"));
IceStorm::QoS qos;
//TODO: qos["reliability"] = "batch";
try
{
- manager->subscribe("events", qos, topics, object);
+ manager->subscribe(qos, object);
}
catch(const IceStorm::NoSuchTopic& e)
{