diff options
Diffstat (limited to 'cpp/demo/IceStorm/replicated2/Subscriber.cpp')
-rw-r--r-- | cpp/demo/IceStorm/replicated2/Subscriber.cpp | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/cpp/demo/IceStorm/replicated2/Subscriber.cpp b/cpp/demo/IceStorm/replicated2/Subscriber.cpp new file mode 100644 index 00000000000..1d7faf18673 --- /dev/null +++ b/cpp/demo/IceStorm/replicated2/Subscriber.cpp @@ -0,0 +1,258 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/IceUtil.h> +#include <Ice/Ice.h> +#include <IceStorm/IceStorm.h> + +#include <Clock.h> + +using namespace std; +using namespace Demo; + +class ClockI : public Clock +{ +public: + + virtual void + tick(const string& time, const Ice::Current&) + { + cout << time << endl; + } +}; + +class Subscriber : public Ice::Application +{ +public: + + virtual int run(int, char*[]); +}; + +int +main(int argc, char* argv[]) +{ + Subscriber app; + return app.main(argc, argv, "config.sub"); +} + +void +usage(const string& n) +{ + cerr << "Usage: " << n + << " [--batch] [--datagram|--twoway|--ordered|--oneway] [--retryCount count] [--id id] [topic]" << endl; +} + +int +Subscriber::run(int argc, char* argv[]) +{ + Ice::StringSeq args = Ice::argsToStringSeq(argc, argv); + args = communicator()->getProperties()->parseCommandLineOptions("Clock", args); + Ice::stringSeqToArgs(args, argc, argv); + + bool batch = false; + enum Option { None, Datagram, Twoway, Oneway, Ordered}; + Option option = None; + string topicName = "time"; + string id; + string retryCount; + int i; + + for(i = 1; i < argc; ++i) + { + string optionString = argv[i]; + Option oldoption = option; + if(optionString == "--datagram") + { + option = Datagram; + } + else if(optionString == "--twoway") + { + option = Twoway; + } + else if(optionString == "--oneway") + { + option = Oneway; + } + else if(optionString == "--ordered") + { + option = Ordered; + } + else if(optionString == "--batch") + { + batch = true; + } + else if(optionString == "--id") + { + ++i; + if(i >= argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + id = argv[i]; + } + else if(optionString == "--retryCount") + { + ++i; + if(i >= argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + retryCount = argv[i]; + } + else if(optionString.substr(0, 2) == "--") + { + usage(argv[0]); + return EXIT_FAILURE; + } + else + { + topicName = argv[i++]; + break; + } + + if(oldoption != option && oldoption != None) + { + usage(argv[0]); + return EXIT_FAILURE; + } + } + + if(i != argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + + if(!retryCount.empty()) + { + if(option == None) + { + option = Twoway; + } + else if(option != Twoway && option != Ordered) + { + cerr << argv[0] << ": retryCount requires a twoway proxy" << endl; + return EXIT_FAILURE; + } + } + + if(batch && (option == Twoway || option == Ordered)) + { + cerr << argv[0] << ": batch can only be set with oneway or datagram" << endl; + return EXIT_FAILURE; + } + + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( + communicator()->propertyToProxy("TopicManager.Proxy")); + if(!manager) + { + cerr << appName() << ": invalid proxy" << endl; + return EXIT_FAILURE; + } + + IceStorm::TopicPrx topic; + try + { + topic = manager->retrieve(topicName); + } + catch(const IceStorm::NoSuchTopic&) + { + try + { + topic = manager->create(topicName); + } + catch(const IceStorm::TopicExists&) + { + cerr << appName() << ": temporary failure. try again." << endl; + return EXIT_FAILURE; + } + } + + Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber"); + + // + // Add a servant for the Ice object. If --id is used the identity + // comes from the command line, otherwise a UUID is used. + // + // id is not directly altered since it is used below to detect + // whether subscribeAndGetPublisher can raise AlreadySubscribed. + // + Ice::Identity subId; + subId.name = id; + if(subId.name.empty()) + { + subId.name = IceUtil::generateUUID(); + } + Ice::ObjectPrx subscriber = adapter->add(new ClockI, subId); + + IceStorm::QoS qos; + if(!retryCount.empty()) + { + qos["retryCount"] = retryCount; + } + + // + // Set up the proxy. + // + if(option == Datagram) + { + if(batch) + { + subscriber = subscriber->ice_batchDatagram(); + } + else + { + subscriber = subscriber->ice_datagram(); + } + } + else if(option == Twoway) + { + // Do nothing to the subscriber proxy. Its already twoway. + } + else if(option == Ordered) + { + // Do nothing to the subscriber proxy. Its already twoway. + qos["reliability"] = "ordered"; + } + else if(option == Oneway || option == None) + { + if(batch) + { + subscriber = subscriber->ice_batchOneway(); + } + else + { + subscriber = subscriber->ice_oneway(); + } + } + + try + { + topic->subscribeAndGetPublisher(qos, subscriber); + } + catch(const IceStorm::AlreadySubscribed&) + { + // If we're manually setting the subscriber id ignore. + if(id.empty()) + { + throw; + } + cout << "reactivating persistent subscriber" << endl; + } + adapter->activate(); + + shutdownOnInterrupt(); + communicator()->waitForShutdown(); + + topic->unsubscribe(subscriber); + + return EXIT_SUCCESS; +} |