diff options
author | Matthew Newhook <matthew@zeroc.com> | 2002-02-28 17:30:43 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2002-02-28 17:30:43 +0000 |
commit | a0262246c57db3d224f2c6bc60b93b0af9ba091e (patch) | |
tree | e19a2b3ea3bddcd94c03e4fc5012fb4ad6a10328 /cpp | |
parent | fixes (diff) | |
download | ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.bz2 ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.xz ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.zip |
IceStorm changes
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/demo/IceStorm/clock/ClockI.cpp | 1 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/Publisher.cpp | 25 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/Subscriber.cpp | 126 | ||||
-rw-r--r-- | cpp/slice/IceStorm/IceStorm.ice | 29 | ||||
-rw-r--r-- | cpp/src/IceStorm/Flusher.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceStorm/LinkSubscriber.cpp | 13 | ||||
-rw-r--r-- | cpp/src/IceStorm/LinkSubscriber.h | 1 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewayBatchSubscriber.cpp | 19 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewayBatchSubscriber.h | 3 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewaySubscriber.cpp | 13 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewaySubscriber.h | 1 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 67 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 179 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 105 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.h | 5 | ||||
-rw-r--r-- | cpp/test/IceStorm/federation/Subscriber.cpp | 19 | ||||
-rw-r--r-- | cpp/test/IceStorm/single/Subscriber.cpp | 10 |
19 files changed, 341 insertions, 291 deletions
diff --git a/cpp/demo/IceStorm/clock/ClockI.cpp b/cpp/demo/IceStorm/clock/ClockI.cpp index 2872ddd4954..769f1503e5e 100644 --- a/cpp/demo/IceStorm/clock/ClockI.cpp +++ b/cpp/demo/IceStorm/clock/ClockI.cpp @@ -9,6 +9,7 @@ // ********************************************************************** #include <Ice/Ice.h> + #include <ClockI.h> using namespace std; diff --git a/cpp/demo/IceStorm/clock/Publisher.cpp b/cpp/demo/IceStorm/clock/Publisher.cpp index 426d0c78f77..a932c8aca87 100644 --- a/cpp/demo/IceStorm/clock/Publisher.cpp +++ b/cpp/demo/IceStorm/clock/Publisher.cpp @@ -10,12 +10,12 @@ #include <Ice/Application.h> #include <IceStorm/IceStorm.h> + #include <Clock.h> using namespace std; -using namespace Ice; -class Publisher : public Application +class Publisher : public Ice::Application { public: @@ -32,16 +32,17 @@ main(int argc, char* argv[]) int Publisher::run(int argc, char* argv[]) { - PropertiesPtr properties = communicator()->getProperties(); - const char* endpointsProperty = "IceStorm.TopicManager.Endpoints"; - std::string endpoints = properties->getProperty(endpointsProperty); + Ice::PropertiesPtr properties = communicator()->getProperties(); + + static const string endpointsProperty = "IceStorm.TopicManager.Endpoints"; + string endpoints = properties->getProperty(endpointsProperty); if (endpoints.empty()) { cerr << appName() << ": property `" << endpointsProperty << "' not set" << endl; return EXIT_FAILURE; } - ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); + Ice::ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base); if (!manager) { @@ -49,6 +50,9 @@ Publisher::run(int argc, char* argv[]) return EXIT_FAILURE; } + // + // Retrieve the topic named "time". + // IceStorm::TopicPrx topic; try { @@ -56,16 +60,17 @@ Publisher::run(int argc, char* argv[]) } catch(const IceStorm::NoSuchTopic& e) { - cerr << appName() << ": NoSuchTopic: " << e.name << endl; + cerr << appName() << ": " << e << " name: " << e.name << endl; return EXIT_FAILURE; } assert(topic); // - // Get a publisher object, create a oneway proxy and then cast to - // a Clock object + // Get a publisher object, create a oneway proxy (for efficiency + // reasons) and then cast to a Clock object. Note that the cast + // must be unchecked. // - ObjectPrx obj = topic->getPublisher(); + Ice::ObjectPrx obj = topic->getPublisher(); obj = obj->ice_oneway(); ClockPrx clock = ClockPrx::uncheckedCast(obj); diff --git a/cpp/demo/IceStorm/clock/Subscriber.cpp b/cpp/demo/IceStorm/clock/Subscriber.cpp index 33a0065c936..78d47f0c73c 100644 --- a/cpp/demo/IceStorm/clock/Subscriber.cpp +++ b/cpp/demo/IceStorm/clock/Subscriber.cpp @@ -10,12 +10,15 @@ #include <Ice/Application.h> #include <IceStorm/IceStorm.h> +#include <IceUtil/UUID.h> + #include <ClockI.h> +#include <list> + using namespace std; -using namespace Ice; -class Subscriber : public Application +class Subscriber : public Ice::Application { public: @@ -32,16 +35,17 @@ main(int argc, char* argv[]) int Subscriber::run(int argc, char* argv[]) { - PropertiesPtr properties = communicator()->getProperties(); - const char* endpointsProperty = "IceStorm.TopicManager.Endpoints"; - std::string endpoints = properties->getProperty(endpointsProperty); + Ice::PropertiesPtr properties = communicator()->getProperties(); + + static const string endpointsProperty = "IceStorm.TopicManager.Endpoints"; + string endpoints = properties->getProperty(endpointsProperty); if (endpoints.empty()) { cerr << appName() << ": property `" << endpointsProperty << "' not set" << endl; return EXIT_FAILURE; } - ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); + Ice::ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base); if (!manager) { @@ -50,10 +54,10 @@ Subscriber::run(int argc, char* argv[]) } // - // Gather the set of topics to subscribe to. It is either the set - // provided on the command line, or the topic "time". + // Gather the set of topics to which to subscribe. It is either + // the set provided on the command line, or the topic "time". // - IceStorm::StringSeq topics; + Ice::StringSeq topics; if (argc > 1) { for (int i = 1; i < argc; ++i) @@ -63,52 +67,96 @@ Subscriber::run(int argc, char* argv[]) } else { - // - // The set of topics to which to subscribe - // topics.push_back("time"); } // - // Create the servant to receive the events. Then add the servant - // to the adapter for the given topics. Alternatively we could - // have used a ServantLocator for the same purpose. Note that any - // of the activated proxies will do since it the proxy is only a - // template from which the actual proxy is created by IceStorm. + // Set the requested quality of service "reliability" = + // "batch". This tells IceStorm to send events to the subscriber + // in batches at regular intervals. + // + IceStorm::QoS qos; + qos["reliability"] = "batch"; + + // + // Create the servant to receive the events. // - ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp"); - ObjectPtr clock = new ClockI(); - ObjectPrx object; + Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp"); + Ice::ObjectPtr clock = new ClockI(); - assert(!topics.empty()); - Ice::Identity ident; - ident.category = "events"; + // + // Create a UUID to use in the name field of the the object + // identity. The same UUID can be used for the identity of each + // Subscriber object since the category field will differ. + // + string uuid = IceUtil::generateUUID(); + + // + // List of all subscribers. + // + list<Ice::ObjectPrx> subscribers; - for (IceStorm::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p) + // + // Add the servant to the adapter for the given + // topics. Alternatively a ServantLocator could have been used for + // the same purpose. + // + for (Ice::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p) { - ident.name = *p; - object = adapter->add(clock, ident); + // + // The category is the name of the topic. + // + Ice::Identity ident; + ident.category = *p; + ident.name = uuid; + + // + // Add a Servant for the Ice Object. + // + Ice::ObjectPrx object = adapter->add(clock, ident); + try + { + manager->subscribe(qos, object); + } + catch(const IceStorm::NoSuchTopic& e) + { + cerr << appName() << ": " << e << " name: " << e.name << endl; + break; + } + + // + // Add to the set of subscribers _after_ subscribing. This + // ensures that only subscribed subscribers are unsubscribed + // in the case of an error. + // + subscribers.push_back(object); } // - // The requested quality of service. This requests "reliability" = - // "batch". This asks IceStorm to send events to the subscriber in - // batches at regular intervals. + // Unless there is a subscriber per topic then there was some + // problem. If there was an error the application should terminate + // without accepting any events. // - IceStorm::QoS qos; - qos["reliability"] = "batch"; - try + if (subscribers.size() == topics.size()) { - manager->subscribe("events", qos, topics, object); + adapter->activate(); + communicator()->waitForShutdown(); } - catch(const IceStorm::NoSuchTopic& e) + + // + // Unsubscribe all subscribed objects. + // + for (list<Ice::ObjectPrx>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) { - cerr << appName() << ": NoSuchTopic: " << e.name << endl; - return EXIT_FAILURE; + try + { + manager->unsubscribe(*q); + } + catch(const IceStorm::NoSuchTopic& e) + { + cerr << appName() << ": " << e << " name: " << e.name << endl; + } } - adapter->activate(); - - communicator()->waitForShutdown(); return EXIT_SUCCESS; } diff --git a/cpp/slice/IceStorm/IceStorm.ice b/cpp/slice/IceStorm/IceStorm.ice index e83e4760598..dd1a0bde238 100644 --- a/cpp/slice/IceStorm/IceStorm.ice +++ b/cpp/slice/IceStorm/IceStorm.ice @@ -173,14 +173,6 @@ exception NoSuchTopic /** * - * A sequence of strings. TODO: This should be moved to the Ice core - * (Ice/Types.ice). - * - **/ -sequence<string> StringSeq; - -/** - * * This dictionary represents Quality of service parameters. * * @see TopicManager::subscribe @@ -234,30 +226,25 @@ interface TopicManager /** * - * Subscribe the given subscriber to a set of topics. - * - * @param id The identity of the subscriber. Each unique - * subscriber must use a unique identity string. To receive events - * the subscriber must register a servant with the identity - * category=id name=topic. + * Subscribe with the given [qos] to the topic identified in the + * [subscribers] identity category. If a subscriber already exists + * with the given identity, then it will be replaced by this + * subscriber. * * @param qos The quality of service parameters for this * subscription. The only currently supported QoS is * "reliability", which can be either "oneway" or "batch". * - * @param topics The topics to which to subscribe. - * - * @param tmpl An object template which is used to send events to - * the subscriber. + * @param tmpl The proxy to which to send events. * * @see unsubscribe * **/ - void subscribe(string id, QoS qos, StringSeq topics, Object* tmpl) throws NoSuchTopic; + void subscribe(QoS qos, Object* subscriber) throws NoSuchTopic; /** * - * Unsubscribe the given subscriber from a set of topics. + * Unsubscribe the given [subscriber]. * * @param id The identity of the given subscriber. * @@ -266,7 +253,7 @@ interface TopicManager * @see subscribe * **/ - void unsubscribe(string id, StringSeq topics); + void unsubscribe(Object* subscriber); /** * diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp index 913612843c2..70558d76eac 100644 --- a/cpp/src/IceStorm/Flusher.cpp +++ b/cpp/src/IceStorm/Flusher.cpp @@ -120,12 +120,6 @@ private: //IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // - // Using standard algorithms I don't think there is a way to - // do this in one pass. For instance, I thought about using - // remove_if - but the predicate needs to be a pure function - // (see Meyers for details). If this is fixed then fix TopicI - // also. - // // remove_if doesn't work with handle types. remove_if also // isn't present in the STLport implementation // diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp index b04cfd8f9cc..b13ee7ec91e 100644 --- a/cpp/src/IceStorm/LinkSubscriber.cpp +++ b/cpp/src/IceStorm/LinkSubscriber.cpp @@ -54,6 +54,19 @@ LinkSubscriber::unsubscribe() } void +LinkSubscriber::replace() +{ + IceUtil::Mutex::Lock sync(_stateMutex); + _state = StateReplaced; + + if (_traceLevels->subscriber > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); + out << "Replace " << _obj->ice_getIdentity(); + } +} + +void LinkSubscriber::publish(const Event& event) { // diff --git a/cpp/src/IceStorm/LinkSubscriber.h b/cpp/src/IceStorm/LinkSubscriber.h index 41722cacfc1..00f62683f8f 100644 --- a/cpp/src/IceStorm/LinkSubscriber.h +++ b/cpp/src/IceStorm/LinkSubscriber.h @@ -29,6 +29,7 @@ public: virtual bool persistent() const; virtual bool inactive() const; virtual void unsubscribe(); + virtual void replace(); virtual void publish(const Event&); virtual void flush(); diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp index 4a9915af023..e42a9728ef1 100644 --- a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp +++ b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp @@ -48,6 +48,25 @@ OnewayBatchSubscriber::unsubscribe() _flusher->remove(this); } +void +OnewayBatchSubscriber::replace() +{ + IceUtil::Mutex::Lock sync(_stateMutex); + _state = StateReplaced; + + if (_traceLevels->subscriber > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); + out << "Replace " << _obj->ice_getIdentity(); + } + + // + // If this subscriber has been registered with the flusher then + // remove it. + // + _flusher->remove(this); +} + bool OnewayBatchSubscriber::inactive() const { diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.h b/cpp/src/IceStorm/OnewayBatchSubscriber.h index 3aaea29e47b..4b90d33d1a6 100644 --- a/cpp/src/IceStorm/OnewayBatchSubscriber.h +++ b/cpp/src/IceStorm/OnewayBatchSubscriber.h @@ -27,10 +27,11 @@ class OnewayBatchSubscriber : public OnewaySubscriber, public Flushable { public: - OnewayBatchSubscriber(const TraceLevelsPtr& traceLevels, const FlusherPtr&, const Ice::ObjectPrx&); + OnewayBatchSubscriber(const TraceLevelsPtr&, const FlusherPtr&, const Ice::ObjectPrx&); ~OnewayBatchSubscriber(); virtual void unsubscribe(); + virtual void replace(); virtual bool inactive() const; virtual void flush(); diff --git a/cpp/src/IceStorm/OnewaySubscriber.cpp b/cpp/src/IceStorm/OnewaySubscriber.cpp index b1007792355..5c547ad4f8f 100644 --- a/cpp/src/IceStorm/OnewaySubscriber.cpp +++ b/cpp/src/IceStorm/OnewaySubscriber.cpp @@ -46,6 +46,19 @@ OnewaySubscriber::unsubscribe() } void +OnewaySubscriber::replace() +{ + IceUtil::Mutex::Lock sync(_stateMutex); + _state = StateReplaced; + + if (_traceLevels->subscriber > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); + out << "Replace " << _obj->ice_getIdentity(); + } +} + +void OnewaySubscriber::publish(const Event& event) { try diff --git a/cpp/src/IceStorm/OnewaySubscriber.h b/cpp/src/IceStorm/OnewaySubscriber.h index 640532b1cab..032821f271a 100644 --- a/cpp/src/IceStorm/OnewaySubscriber.h +++ b/cpp/src/IceStorm/OnewaySubscriber.h @@ -25,6 +25,7 @@ public: virtual bool persistent() const; virtual void unsubscribe(); + virtual void replace(); virtual void publish(const Event&); protected: diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index ac9226f1f85..62bb7eaab8d 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -32,11 +32,11 @@ Subscriber::inactive() const return _state != StateActive;; } -Subscriber::State -Subscriber::state() const +bool +Subscriber::error() const { IceUtil::Mutex::Lock sync(_stateMutex); - return _state; + return _state == StateError; } diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index b2db80e3695..453c8b125b3 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -56,35 +56,15 @@ public: virtual bool persistent() const = 0; // - // Subscriber state. - // - enum State - { - // - // The Subscriber is active - // - StateActive, - // - // The Subscriber encountered an error during event - // transmission - // - StateError, - // - // The Subscriber has been unsubscribed - // - StateUnsubscribed - }; - - // - // This method is for ease of use with STL algorithms. - // Return _state != StateActive + // Return true if the Subscriber is not active, false otherwise. // bool inactive() const; // - // Retrieve the state of the Subscriber. + // Retrieve true if the Subscriber is in the error state, false + // otherwise. // - State state() const; + bool error() const; // // Retrieve the identity of the Subscriber. @@ -97,8 +77,13 @@ public: virtual void unsubscribe() = 0; // - // Publish the given event. Mark the state as Error in the event - // of a problem. + // Unsubscribe. Mark the state as Replaced. + // + virtual void replace() = 0; + + // + // Publish the given event. Mark the state as Error in the event of + // a problem. // virtual void publish(const Event&) = 0; @@ -107,16 +92,40 @@ protected: // Immutable TraceLevelsPtr _traceLevels; + // + // Subscriber state. + // + enum State + { + // + // The Subscriber is active. + // + StateActive, + // + // The Subscriber encountered an error during event + // transmission. + // + StateError, + // + // The Subscriber has been unsubscribed. + // + StateUnsubscribed, + // + // The Subscriber has been replaced. + // + StateReplaced + }; + IceUtil::Mutex _stateMutex; State _state; private: // - // This id is the full id of the subscriber for a particular topic - // (that is <prefix>#<topicname> + // This id is the full id of the subscriber for a particular topic. + // + // Immutable. // - // Immutable Ice::Identity _id; }; diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 1e35b5d9308..f4d93751563 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -105,7 +105,31 @@ public: void add(const SubscriberPtr& subscriber) { + Ice::Identity id = subscriber->id(); + IceUtil::Mutex::Lock sync(_subscribersMutex); + + // + // If a subscriber with this identity is already subscribed + // then mark the subscriber as replaced. + // + // Note that this doesn't actually remove the Subscribe from + // the list of subscribers - it marks the Subscriber as + // replaced, and it's removed on the next event publish. + // + for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) + { + if ((*i)->id() == id) + { + // + // This marks the subscriber as invalid. It will be + // removed on the next event publish. + // + (*i)->replace(); + break; + } + } + // // Add to the set of subscribers @@ -116,16 +140,17 @@ public: // // Unsubscribe the Subscriber with the given identity. Note that // this doesn't remove the Subscriber from the list of subscribers - // - it marks the Subscriber as Unsubscribed, and it's removed on + // - it marks the Subscriber as unsubscribed, and it's removed on // the next event publish. // void - unsubscribe(const Ice::Identity& id) + remove(const Ice::ObjectPrx& obj) { + Ice::Identity id = obj->ice_getIdentity(); + IceUtil::Mutex::Lock sync(_subscribersMutex); - SubscriberList::iterator i; - for (i = _subscribers.begin() ; i != _subscribers.end(); ++i) + for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) { if ((*i)->id() == id) { @@ -134,20 +159,17 @@ public: // removed on the next event publish. // (*i)->unsubscribe(); - break; + return; } } // // If the subscriber was not found then display a diagnostic // - if (i == _subscribers.end()) + if (_traceLevels->topic > 0) { - if (_traceLevels->topic > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); - out << id << ": not subscribed."; - } + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + out << id << ": not subscribed."; } } @@ -161,55 +183,61 @@ public: // advantage that publishes can occur in parallel and less // subscriber list iterations. // - void publish(const Event& event) { - IceUtil::Mutex::Lock sync(_subscribersMutex); - // - // Using standard algorithms I don't think there is a way to - // do this in one pass. For instance, I thought about using - // remove_if - but the predicate needs to be a pure function - // (see Meyers for details). If this is fixed then fix Flusher - // also. + // Copy of the subscriber list so that event publishing can + // occur in parallel. // - // remove_if doesn't work with handle types. remove_if also - // isn't present in the STLport implementation + // TODO: Find out whether this is a false optimization - how + // expensive is the cost of copying vs. lack of parallelism? // - // _subscribers.remove_if(IceUtil::constMemFun(&Subscriber::inactive)); - // + SubscriberList copy; - // - // Erase the inactive subscribers from the _subscribers - // list. Copy the subscribers in error to the error list. - // - SubscriberList::iterator p = remove_if(_subscribers.begin(), _subscribers.end(), - IceUtil::constMemFun(&Subscriber::inactive)); - - if (p != _subscribers.end()) { - IceUtil::Mutex::Lock errorSync(_errorMutex); + IceUtil::Mutex::Lock sync(_subscribersMutex); + // - // Copy each of the invalid subscribers that was not - // unsubscribed. Note that there is no copy_if algorithm. + // Copy of the subscribers that are in error. // - // Note that this could also be written in terms of splice - // & remove_if. + SubscriberList e; + + // + // Erase the inactive subscribers from the _subscribers + // list. Copy the subscribers in error to the error list. // + SubscriberList::iterator p = _subscribers.begin(); while (p != _subscribers.end()) { - if ((*p)->state() == Subscriber::StateError) + if ((*p)->inactive()) + { + if ((*p)->error()) + { + e.push_back(*p); + } + + SubscriberList::iterator tmp = p; + ++p; + _subscribers.erase(tmp); + } + else { - _error.push_front(*p); + copy.push_back(*p); + ++p; } - _subscribers.erase(p++); + } + + if (!e.empty()) + { + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.splice(_error.begin(), e); } } - for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i) + for (SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p) { - (*i)->publish(event); + (*p)->publish(event); } } @@ -217,7 +245,7 @@ public: // Clear & return the set of subscribers that are in error. // SubscriberList - clearErrorList() const + clearErrorList() { // // Uses splice for efficiency @@ -229,17 +257,20 @@ public: } private: - + TraceLevelsPtr _traceLevels; + // + // TODO: Should there be a map from identity to subscriber? + // IceUtil::Mutex _subscribersMutex; SubscriberList _subscribers; // - // Set of subscribers that encountered an error. + // Set of subscribers that have encountered an error. // IceUtil::Mutex _errorMutex; - mutable SubscriberList _error; + SubscriberList _error; }; } // End namespace IceStorm @@ -319,7 +350,7 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace // // Create a servant per Topic to receive linked event data. The // servants Identity is category=<topicname>, name=link. Activate - // the object and save a reference to give to linked topics.. + // the object and save a reference to give to linked topics. // _link = new TopicLinkI(_subscribers); @@ -327,16 +358,20 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace _linkPrx = TopicLinkPrx::uncheckedCast(_adapter->add(_link, id)); // - // Run through link database - re-establishing linked subscribers + // Run through link database re-establishing linked subscribers. // for (IdentityLinkDict::const_iterator p = _links.begin(); p != _links.end(); ++p) { if (_traceLevels->topic > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); - out << _name << " relink " << p->second.obj->ice_getIdentity(); + out << _name << " relink " << p->first; } + // + // Create the Subscriber object and add to the set of + // Subscribers. + // SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost); _subscribers->add(subscriber); } @@ -411,12 +446,15 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) } // - // Retrieve the TopicLink + // Retrieve the TopicLink. // TopicInternalPrx internal = TopicInternalPrx::checkedCast(topic); TopicLinkPrx link = internal->getLinkProxy(); Ice::Identity ident = link->ice_getIdentity(); + // + // Create the LinkDB record. + // LinkDB dbInfo; dbInfo.obj = link; dbInfo.info.topic = topic; @@ -424,20 +462,9 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) dbInfo.info.cost = cost; // - // If the link already exists then remove the original subscriber. - // - // Note: If events arrive before the new subscriber is added then - // they will be lost. An alternative to this strategy would be to - // update the subscriber information. + // Create the Subscriber object and add to the setup of + // subscribers. // - IdentityLinkDict::iterator p = _links.find(ident); - if (p != _links.end()) - { - _subscribers->unsubscribe(ident); - } - - _links.insert(make_pair(ident, dbInfo)); - SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost); _subscribers->add(subscriber); } @@ -464,7 +491,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current&) Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); out << _name << " unlink " << topic->getName(); } - _subscribers->unsubscribe(link->ice_getIdentity()); + _subscribers->remove(link); } else { @@ -514,7 +541,7 @@ TopicI::destroyed() const } void -TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos) +TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos) { IceUtil::RecMutex::Lock sync(*this); @@ -526,23 +553,14 @@ TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos) reap(); // - // Create the identity category=id, name=<topicname>. - // - Ice::Identity ident; - ident.category = id; - ident.name = _name; - Ice::ObjectPrx obj = tmpl->ice_newIdentity(ident); - - // - // Create the subscriber with this id and add to the set of - // subscribers. + // Add this subscriber to the set of subscribers. // SubscriberPtr subscriber = _factory->createSubscriber(qos, obj); _subscribers->add(subscriber); } void -TopicI::unsubscribe(const string& id) +TopicI::unsubscribe(const Ice::ObjectPrx& obj) { IceUtil::RecMutex::Lock sync(*this); @@ -554,16 +572,9 @@ TopicI::unsubscribe(const string& id) reap(); // - // Create the identity category=id, name=<topicname>. - // - Ice::Identity ident; - ident.category = id; - ident.name = _name; - - // // Unsubscribe the subscriber with this identity. // - _subscribers->unsubscribe(ident); + _subscribers->remove(obj); } void @@ -584,7 +595,7 @@ TopicI::reap() for (SubscriberList::iterator p = error.begin(); p != error.end(); ++p) { SubscriberPtr subscriber = *p; - assert(subscriber->state() == Subscriber::StateError); + assert(subscriber->error()); if (subscriber->persistent()) { if (_links.erase(subscriber->id()) > 0) diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h index f920310bc7c..26e670203ea 100644 --- a/cpp/src/IceStorm/TopicI.h +++ b/cpp/src/IceStorm/TopicI.h @@ -55,8 +55,8 @@ public: // Internal methods bool destroyed() const; - void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&); - void unsubscribe(const std::string&); + void subscribe(const Ice::ObjectPrx&, const QoS&); + void unsubscribe(const Ice::ObjectPrx&); void reap(); diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 9cf3616b867..8bac763e306 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -51,9 +51,8 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice { if (_traceLevels->topicMgr > 0) { - ostringstream s; - s << ex; - _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); + out << ex; } StringBoolDict::iterator tmp = p; ++p; @@ -137,104 +136,66 @@ TopicManagerI::retrieveAll(const Ice::Current&) } void -TopicManagerI::subscribe(const string& id, const QoS& qos, const StringSeq& topics, const Ice::ObjectPrx& tmpl, - const Ice::Current&) +TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&) { IceUtil::Mutex::Lock sync(*this); + Ice::Identity ident = subscriber->ice_getIdentity(); if (_traceLevels->topicMgr > 0) { - ostringstream s; - s << "Subscribe: " << id; + Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); + out << "Subscribe: " << Ice::identityToString(ident); if (_traceLevels->topicMgr > 1) { - s << " QoS: "; + out << " QoS: "; for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) { if (qi != qos.begin()) { - s << ','; + out << ','; } - s << '[' << qi->first << "," << qi->second << ']'; - } - s << " Topics: "; - for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti) - { - if (ti != topics.begin()) - { - s << ","; - } - s << *ti; + out << '[' << qi->first << "," << qi->second << ']'; } } - _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); } // - // First scan the set of topics to ensure that each exists. + // Ensure that the identities category refers to an existing + // channel. // - // TODO: This could be slightly optimized by remembering the - // TopicIPtr's so that the list doesn't need to scanned again. - // - StringSeq::const_iterator i; - for (i = topics.begin() ; i != topics.end() ; ++i) + TopicIMap::iterator elem = _topicIMap.find(ident.category); + if (elem == _topicIMap.end()) { - TopicIMap::iterator elem = _topicIMap.find(*i); - if (elem == _topicIMap.end()) - { - NoSuchTopic ex; - ex.name = *i; - throw ex; - } + NoSuchTopic ex; + ex.name = ident.category; + throw ex; } // - // Subscribe to each Topic. + // Subscribe to the topic. // - for (i = topics.begin() ; i != topics.end() ; ++i) - { - TopicIMap::iterator elem = _topicIMap.find(*i); - if (elem != _topicIMap.end()) - { - elem->second->subscribe(tmpl, id, qos); - } - } + elem->second->subscribe(subscriber, qos); } void -TopicManagerI::unsubscribe(const string& id, const StringSeq& topics, const Ice::Current&) +TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) { IceUtil::Mutex::Lock sync(*this); + Ice::Identity ident = subscriber->ice_getIdentity(); + if (_traceLevels->topicMgr > 0) { - ostringstream s; - s << "Unsubscribe: " << id; - if (_traceLevels->topicMgr > 1) - { - s << " Topics: "; - for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti) - { - if (ti != topics.begin()) - { - s << ","; - } - s << *ti; - } - } - _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); + + out << "Unsubscribe: " << Ice::identityToString(ident); } - // - // Unsubscribe to each Topic. - // - for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i) + + TopicIMap::iterator elem = _topicIMap.find(ident.category); + if (elem != _topicIMap.end()) { - TopicIMap::iterator elem = _topicIMap.find(*i); - if (elem != _topicIMap.end()) - { - elem->second->unsubscribe(id); - } + elem->second->unsubscribe(subscriber); } } @@ -260,9 +221,8 @@ TopicManagerI::reap() { if (_traceLevels->topicMgr > 0) { - ostringstream s; - s << "Reaping " << i->first; - _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); + out << "Reaping " << i->first; } _topics.erase(i->first); _topicIMap.erase(i++); @@ -279,9 +239,8 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name, { if (_traceLevels->topicMgr > 0) { - ostringstream s; - s << message << ' ' << name; - _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); + out << message << ' ' << name; } // TODO: instance diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h index 4df698d4743..0aa5b91f784 100644 --- a/cpp/src/IceStorm/TopicManagerI.h +++ b/cpp/src/IceStorm/TopicManagerI.h @@ -51,9 +51,8 @@ public: virtual TopicPrx create(const std::string&, const Ice::Current&); virtual TopicPrx retrieve(const std::string&, const Ice::Current&); virtual TopicDict retrieveAll(const Ice::Current&); - virtual void subscribe(const std::string&, const QoS&, const StringSeq&, const Ice::ObjectPrx&, - const Ice::Current&); - virtual void unsubscribe(const std::string&, const StringSeq&, const Ice::Current&); + virtual void subscribe(const QoS&, const Ice::ObjectPrx&, const Ice::Current&); + virtual void unsubscribe(const Ice::ObjectPrx&, const Ice::Current&); virtual void shutdown(const Ice::Current&); void reap(); diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp index 051ced9605c..6f4cc277b78 100644 --- a/cpp/test/IceStorm/federation/Subscriber.cpp +++ b/cpp/test/IceStorm/federation/Subscriber.cpp @@ -101,26 +101,21 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) EventIPtr eventFed1 = new EventI(communicator); EventIPtr eventFed2 = new EventI(communicator); EventIPtr eventFed3 = new EventI(communicator); - // - // Any of the objects will do as long as they are all activated - // - ObjectPrx object = adapter->add(eventFed1, stringToIdentity("events#fed1")); - adapter->add(eventFed2, stringToIdentity("events#fed2")); - adapter->add(eventFed3, stringToIdentity("events#fed3")); // - // The set of topics to which to subscribe + // Activate the servants. // - IceStorm::StringSeq topics; - topics.push_back("fed1"); - topics.push_back("fed2"); - topics.push_back("fed3"); + ObjectPrx objFed1 = adapter->add(eventFed1, stringToIdentity("fed1#events")); + ObjectPrx objFed2 = adapter->add(eventFed2, stringToIdentity("fed2#events")); + ObjectPrx objFed3 = adapter->add(eventFed3, stringToIdentity("fed3#events")); IceStorm::QoS qos; //TODO: qos["reliability"] = "batch"; try { - manager->subscribe("events", qos, topics, object); + manager->subscribe(qos, objFed1); + manager->subscribe(qos, objFed2); + manager->subscribe(qos, objFed3); } catch(const IceStorm::NoSuchTopic& e) { diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp index aae8fc240fe..fc91cb09767 100644 --- a/cpp/test/IceStorm/single/Subscriber.cpp +++ b/cpp/test/IceStorm/single/Subscriber.cpp @@ -92,19 +92,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default"); ObjectPtr single = new SingleI(communicator); - ObjectPrx object = adapter->add(single, stringToIdentity("events#single")); - - // - // The set of topics to which to subscribe - // - IceStorm::StringSeq topics; - topics.push_back("single"); + ObjectPrx object = adapter->add(single, stringToIdentity("single#events")); IceStorm::QoS qos; //TODO: qos["reliability"] = "batch"; try { - manager->subscribe("events", qos, topics, object); + manager->subscribe(qos, object); } catch(const IceStorm::NoSuchTopic& e) { |