diff options
Diffstat (limited to 'cpp/demo/IceStorm/clock/Subscriber.cpp')
-rw-r--r-- | cpp/demo/IceStorm/clock/Subscriber.cpp | 126 |
1 files changed, 87 insertions, 39 deletions
diff --git a/cpp/demo/IceStorm/clock/Subscriber.cpp b/cpp/demo/IceStorm/clock/Subscriber.cpp index 33a0065c936..78d47f0c73c 100644 --- a/cpp/demo/IceStorm/clock/Subscriber.cpp +++ b/cpp/demo/IceStorm/clock/Subscriber.cpp @@ -10,12 +10,15 @@ #include <Ice/Application.h> #include <IceStorm/IceStorm.h> +#include <IceUtil/UUID.h> + #include <ClockI.h> +#include <list> + using namespace std; -using namespace Ice; -class Subscriber : public Application +class Subscriber : public Ice::Application { public: @@ -32,16 +35,17 @@ main(int argc, char* argv[]) int Subscriber::run(int argc, char* argv[]) { - PropertiesPtr properties = communicator()->getProperties(); - const char* endpointsProperty = "IceStorm.TopicManager.Endpoints"; - std::string endpoints = properties->getProperty(endpointsProperty); + Ice::PropertiesPtr properties = communicator()->getProperties(); + + static const string endpointsProperty = "IceStorm.TopicManager.Endpoints"; + string endpoints = properties->getProperty(endpointsProperty); if (endpoints.empty()) { cerr << appName() << ": property `" << endpointsProperty << "' not set" << endl; return EXIT_FAILURE; } - ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); + Ice::ObjectPrx base = communicator()->stringToProxy("TopicManager:" + endpoints); IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base); if (!manager) { @@ -50,10 +54,10 @@ Subscriber::run(int argc, char* argv[]) } // - // Gather the set of topics to subscribe to. It is either the set - // provided on the command line, or the topic "time". + // Gather the set of topics to which to subscribe. It is either + // the set provided on the command line, or the topic "time". // - IceStorm::StringSeq topics; + Ice::StringSeq topics; if (argc > 1) { for (int i = 1; i < argc; ++i) @@ -63,52 +67,96 @@ Subscriber::run(int argc, char* argv[]) } else { - // - // The set of topics to which to subscribe - // topics.push_back("time"); } // - // Create the servant to receive the events. Then add the servant - // to the adapter for the given topics. Alternatively we could - // have used a ServantLocator for the same purpose. Note that any - // of the activated proxies will do since it the proxy is only a - // template from which the actual proxy is created by IceStorm. + // Set the requested quality of service "reliability" = + // "batch". This tells IceStorm to send events to the subscriber + // in batches at regular intervals. + // + IceStorm::QoS qos; + qos["reliability"] = "batch"; + + // + // Create the servant to receive the events. // - ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp"); - ObjectPtr clock = new ClockI(); - ObjectPrx object; + Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("ClockAdapter", "tcp"); + Ice::ObjectPtr clock = new ClockI(); - assert(!topics.empty()); - Ice::Identity ident; - ident.category = "events"; + // + // Create a UUID to use in the name field of the the object + // identity. The same UUID can be used for the identity of each + // Subscriber object since the category field will differ. + // + string uuid = IceUtil::generateUUID(); + + // + // List of all subscribers. + // + list<Ice::ObjectPrx> subscribers; - for (IceStorm::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p) + // + // Add the servant to the adapter for the given + // topics. Alternatively a ServantLocator could have been used for + // the same purpose. + // + for (Ice::StringSeq::iterator p = topics.begin(); p != topics.end(); ++p) { - ident.name = *p; - object = adapter->add(clock, ident); + // + // The category is the name of the topic. + // + Ice::Identity ident; + ident.category = *p; + ident.name = uuid; + + // + // Add a Servant for the Ice Object. + // + Ice::ObjectPrx object = adapter->add(clock, ident); + try + { + manager->subscribe(qos, object); + } + catch(const IceStorm::NoSuchTopic& e) + { + cerr << appName() << ": " << e << " name: " << e.name << endl; + break; + } + + // + // Add to the set of subscribers _after_ subscribing. This + // ensures that only subscribed subscribers are unsubscribed + // in the case of an error. + // + subscribers.push_back(object); } // - // The requested quality of service. This requests "reliability" = - // "batch". This asks IceStorm to send events to the subscriber in - // batches at regular intervals. + // Unless there is a subscriber per topic then there was some + // problem. If there was an error the application should terminate + // without accepting any events. // - IceStorm::QoS qos; - qos["reliability"] = "batch"; - try + if (subscribers.size() == topics.size()) { - manager->subscribe("events", qos, topics, object); + adapter->activate(); + communicator()->waitForShutdown(); } - catch(const IceStorm::NoSuchTopic& e) + + // + // Unsubscribe all subscribed objects. + // + for (list<Ice::ObjectPrx>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) { - cerr << appName() << ": NoSuchTopic: " << e.name << endl; - return EXIT_FAILURE; + try + { + manager->unsubscribe(*q); + } + catch(const IceStorm::NoSuchTopic& e) + { + cerr << appName() << ": " << e << " name: " << e.name << endl; + } } - adapter->activate(); - - communicator()->waitForShutdown(); return EXIT_SUCCESS; } |