diff options
Diffstat (limited to 'cpp/demo/IceStorm/replicated/Subscriber.cpp')
-rw-r--r-- | cpp/demo/IceStorm/replicated/Subscriber.cpp | 369 |
1 files changed, 185 insertions, 184 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp index 7a0848c76a4..9754af70bcd 100644 --- a/cpp/demo/IceStorm/replicated/Subscriber.cpp +++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp @@ -10,128 +10,12 @@ #include <IceUtil/IceUtil.h> #include <Ice/Ice.h> #include <IceStorm/IceStorm.h> -#include <IceGrid/Query.h> #include <Clock.h> -#include <set> - using namespace std; using namespace Demo; -class InstanceCheckThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - InstanceCheckThread(const IceGrid::QueryPrx& query, const string& topicName, const Ice::ObjectPrx& clock, - const Ice::ObjectPrx& managerReplica, const set<IceStorm::TopicManagerPrx>& managers, - const Ice::ObjectPrx& topicReplica, const set<IceStorm::TopicPrx>& topics) : - _query(query), - _topicName(topicName), - _clock(clock), - _managerReplica(managerReplica), - _managers(managers), - _topicReplica(topicReplica), - _topics(topics), - _timeout(IceUtil::Time::seconds(10)), - _terminated(false) - { - } - - virtual void - run() - { - Lock sync(*this); - while(!_terminated) - { - timedWait(_timeout); - if(!_terminated) - { - try - { - // - // Check if there are any new topic managers in the replicas list. - // - Ice::ObjectProxySeq managers = _query->findAllReplicas(_managerReplica); - Ice::ObjectProxySeq::const_iterator p; - for(p = managers.begin(); p != managers.end(); ++p) - { - IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(*p); - if(_managers.find(manager) == _managers.end()) - { - // - // Create the topic on the new topic manager if it does - // not already exist. - // - try - { - manager->retrieve(_topicName); - } - catch(const IceStorm::NoSuchTopic&) - { - manager->create(_topicName); - } - - // - // Since the topic proxy returned by the retrieval/creation - // is a replicated proxy we cannot use it to subscribe to the - // new instance. Instead we have to retrive all the topics and - // find the new one. - // - Ice::ObjectProxySeq topics = _query->findAllReplicas(_topicReplica); - Ice::ObjectProxySeq::const_iterator q; - for(q = topics.begin(); q != topics.end(); ++q) - { - IceStorm::TopicPrx topic = IceStorm::TopicPrx::uncheckedCast(*q);; - if(_topics.find(topic) == _topics.end()) - { - try - { - topic->subscribeAndGetPublisher(IceStorm::QoS(), _clock); - } - catch(const IceStorm::AlreadySubscribed&) - { - // Ignore - } - _topics.insert(topic); - _managers.insert(manager); - break; - } - } - } - } - } - catch(const Ice::Exception& ex) - { - cerr << "warning: exception while checking for new IceStorm instances: " << ex << endl; - } - } - } - } - - void - terminate() - { - Lock sync(*this); - _terminated = true; - notify(); - } - -private: - - const IceGrid::QueryPrx _query; - const string _topicName; - const Ice::ObjectPrx _clock; - const Ice::ObjectPrx _managerReplica; - set<IceStorm::TopicManagerPrx> _managers; - const Ice::ObjectPrx _topicReplica; - set<IceStorm::TopicPrx> _topics; - const IceUtil::Time _timeout; - bool _terminated; -}; - -typedef IceUtil::Handle<InstanceCheckThread> InstanceCheckThreadPtr; - class ClockI : public Clock { public: @@ -157,109 +41,226 @@ main(int argc, char* argv[]) return app.main(argc, argv, "config.sub"); } +void +usage(const string& n) +{ + cerr << "Usage: " << n + << " [--batch] [--datagram|--twoway|--ordered|--oneway] [--retryCount count] [--id id] [topic]" << endl; +} + int Subscriber::run(int argc, char* argv[]) { - if(argc > 2) + Ice::StringSeq args = Ice::argsToStringSeq(argc, argv); + args = communicator()->getProperties()->parseCommandLineOptions("Clock", args); + Ice::stringSeqToArgs(args, argc, argv); + + bool batch = false; + enum Option { None, Datagram, Twoway, Oneway, Ordered}; + Option option = None; + string topicName = "time"; + string id; + string retryCount; + int i; + + for(i = 1; i < argc; ++i) { - cerr << appName() << ": too many arguments" << endl; - return EXIT_FAILURE; - } + string optionString = argv[i]; + Option oldoption = option; + if(optionString == "--datagram") + { + option = Datagram; + } + else if(optionString == "--twoway") + { + option = Twoway; + } + else if(optionString == "--oneway") + { + option = Oneway; + } + else if(optionString == "--ordered") + { + option = Ordered; + } + else if(optionString == "--batch") + { + batch = true; + } + else if(optionString == "--id") + { + ++i; + if(i >= argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + id = argv[i]; + } + else if(optionString == "--retryCount") + { + ++i; + if(i >= argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + retryCount = argv[i]; + } + else if(optionString.substr(0, 2) == "--") + { + usage(argv[0]); + return EXIT_FAILURE; + } + else + { + topicName = argv[i++]; + break; + } - Ice::PropertiesPtr properties = communicator()->getProperties(); + if(oldoption != option && oldoption != None) + { + usage(argv[0]); + return EXIT_FAILURE; + } + } - IceGrid::QueryPrx query = IceGrid::QueryPrx::uncheckedCast(communicator()->stringToProxy("DemoIceGrid/Query")); - Ice::ObjectPrx managerReplica = communicator()->stringToProxy("DemoIceStorm/TopicManager"); - Ice::ObjectProxySeq objSeq = query->findAllReplicas(managerReplica); - if(objSeq.size() == 0) + if(i != argc) { - cerr << appName() << ": no topic managers found, make sure application was deployed." << endl; + usage(argv[0]); return EXIT_FAILURE; } - string topicName = "time"; - if(argc != 1) + if(!retryCount.empty()) { - topicName = argv[1]; + if(option == None) + { + option = Twoway; + } + else if(option != Twoway && option != Ordered) + { + cerr << argv[0] << ": retryCount requires a twoway proxy" << endl; + return EXIT_FAILURE; + } } - // - // Create the servant to receive the events. - // - Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber"); - - // - // We want to use oneway batch messages. - // - Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway(); + if(batch && (option == Twoway || option == Ordered)) + { + cerr << argv[0] << ": batch can only be set with oneway or datagram" << endl; + return EXIT_FAILURE; + } - // - // Get all the topic managers and create the topic if necessary. - // - IceStorm::TopicPrx topicReplica; - set<IceStorm::TopicManagerPrx> managers; - Ice::ObjectProxySeq::const_iterator p; - for(p = objSeq.begin(); p != objSeq.end(); ++p) + IceStorm::TopicManagerPrx manager; + try + { + manager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy("DemoIceStorm/TopicManager")); + if(!manager) + { + cerr << appName() << ": invalid proxy" << endl; + return EXIT_FAILURE; + } + } + catch(const Ice::NotRegisteredException&) { - IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(*p); - managers.insert(manager); + cerr << appName() << ": no topic manager found, make sure application was deployed." << endl; + return EXIT_FAILURE; + } + IceStorm::TopicPrx topic; + try + { + topic = manager->retrieve(topicName); + } + catch(const IceStorm::NoSuchTopic&) + { try { - topicReplica = manager->retrieve(topicName); + topic = manager->create(topicName); } - catch(const IceStorm::NoSuchTopic&) + catch(const IceStorm::TopicExists&) { - try - { - topicReplica = manager->create(topicName); - } - catch(const IceStorm::TopicExists&) - { - cerr << appName() << ": temporary failure. try again." << endl; - return EXIT_FAILURE; - } + cerr << appName() << ": temporary failure. try again." << endl; + return EXIT_FAILURE; } } + Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber"); + // - // Get all the topics and subscribe. We can't use the proxies returned by - // the topic creation above because they are repicated proxies, and not - // specific proxies for each individual topic. + // Add a servant for the Ice object. If --id is used the identity + // comes from the command line, otherwise a UUID is used. // - IceStorm::TopicPrx topic; - objSeq = query->findAllReplicas(topicReplica); - set<IceStorm::TopicPrx> topics; - for(p = objSeq.begin(); p != objSeq.end(); ++p) + // id is not directly altered since it is used below to detect + // whether subscribeAndGetPublisher can raise AlreadySubscribed. + // + Ice::Identity subId; + subId.name = id; + if(subId.name.empty()) { - topic = IceStorm::TopicPrx::uncheckedCast(*p); - topic->subscribeAndGetPublisher(IceStorm::QoS(), clock); - topics.insert(topic); + subId.name = IceUtil::generateUUID(); + } + Ice::ObjectPrx subscriber = adapter->add(new ClockI, subId); + + IceStorm::QoS qos; + if(!retryCount.empty()) + { + qos["retryCount"] = retryCount; } // - // Create and start thread to check for new IceStorm instances coming online. + // Set up the proxy. // - InstanceCheckThreadPtr instanceCheck = - new InstanceCheckThread(query, topicName, clock, managerReplica, managers, topicReplica, topics); - instanceCheck->start(); + if(option == Datagram) + { + if(batch) + { + subscriber = subscriber->ice_batchDatagram(); + } + else + { + subscriber = subscriber->ice_datagram(); + } + } + else if(option == Twoway) + { + // Do nothing to the subscriber proxy. Its already twoway. + } + else if(option == Ordered) + { + // Do nothing to the subscriber proxy. Its already twoway. + qos["reliability"] = "ordered"; + } + else if(option == Oneway || option == None) + { + if(batch) + { + subscriber = subscriber->ice_batchOneway(); + } + else + { + subscriber = subscriber->ice_oneway(); + } + } + try + { + topic->subscribeAndGetPublisher(qos, subscriber); + } + catch(const IceStorm::AlreadySubscribed&) + { + // If we're manually setting the subscriber id ignore. + if(id.empty()) + { + throw; + } + cout << "reactivating persistent subscriber" << endl; + } adapter->activate(); + shutdownOnInterrupt(); communicator()->waitForShutdown(); - instanceCheck->terminate(); - instanceCheck->getThreadControl().join(); - - // - // Unsubscribe from all topics. - // - objSeq = query->findAllReplicas(topicReplica); - for(p = objSeq.begin(); p != objSeq.end(); ++p) - { - topic = IceStorm::TopicPrx::uncheckedCast(*p); - topic->unsubscribe(clock); - } + topic->unsubscribe(subscriber); return EXIT_SUCCESS; } |