summaryrefslogtreecommitdiff
path: root/cpp/demo/IceStorm/replicated2/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/demo/IceStorm/replicated2/Subscriber.cpp')
-rw-r--r--cpp/demo/IceStorm/replicated2/Subscriber.cpp258
1 files changed, 258 insertions, 0 deletions
diff --git a/cpp/demo/IceStorm/replicated2/Subscriber.cpp b/cpp/demo/IceStorm/replicated2/Subscriber.cpp
new file mode 100644
index 00000000000..1d7faf18673
--- /dev/null
+++ b/cpp/demo/IceStorm/replicated2/Subscriber.cpp
@@ -0,0 +1,258 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/IceUtil.h>
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+
+#include <Clock.h>
+
+using namespace std;
+using namespace Demo;
+
+class ClockI : public Clock
+{
+public:
+
+ virtual void
+ tick(const string& time, const Ice::Current&)
+ {
+ cout << time << endl;
+ }
+};
+
+class Subscriber : public Ice::Application
+{
+public:
+
+ virtual int run(int, char*[]);
+};
+
+int
+main(int argc, char* argv[])
+{
+ Subscriber app;
+ return app.main(argc, argv, "config.sub");
+}
+
+void
+usage(const string& n)
+{
+ cerr << "Usage: " << n
+ << " [--batch] [--datagram|--twoway|--ordered|--oneway] [--retryCount count] [--id id] [topic]" << endl;
+}
+
+int
+Subscriber::run(int argc, char* argv[])
+{
+ Ice::StringSeq args = Ice::argsToStringSeq(argc, argv);
+ args = communicator()->getProperties()->parseCommandLineOptions("Clock", args);
+ Ice::stringSeqToArgs(args, argc, argv);
+
+ bool batch = false;
+ enum Option { None, Datagram, Twoway, Oneway, Ordered};
+ Option option = None;
+ string topicName = "time";
+ string id;
+ string retryCount;
+ int i;
+
+ for(i = 1; i < argc; ++i)
+ {
+ string optionString = argv[i];
+ Option oldoption = option;
+ if(optionString == "--datagram")
+ {
+ option = Datagram;
+ }
+ else if(optionString == "--twoway")
+ {
+ option = Twoway;
+ }
+ else if(optionString == "--oneway")
+ {
+ option = Oneway;
+ }
+ else if(optionString == "--ordered")
+ {
+ option = Ordered;
+ }
+ else if(optionString == "--batch")
+ {
+ batch = true;
+ }
+ else if(optionString == "--id")
+ {
+ ++i;
+ if(i >= argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ id = argv[i];
+ }
+ else if(optionString == "--retryCount")
+ {
+ ++i;
+ if(i >= argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ retryCount = argv[i];
+ }
+ else if(optionString.substr(0, 2) == "--")
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ else
+ {
+ topicName = argv[i++];
+ break;
+ }
+
+ if(oldoption != option && oldoption != None)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ }
+
+ if(i != argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+
+ if(!retryCount.empty())
+ {
+ if(option == None)
+ {
+ option = Twoway;
+ }
+ else if(option != Twoway && option != Ordered)
+ {
+ cerr << argv[0] << ": retryCount requires a twoway proxy" << endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ if(batch && (option == Twoway || option == Ordered))
+ {
+ cerr << argv[0] << ": batch can only be set with oneway or datagram" << endl;
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
+ communicator()->propertyToProxy("TopicManager.Proxy"));
+ if(!manager)
+ {
+ cerr << appName() << ": invalid proxy" << endl;
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::TopicPrx topic;
+ try
+ {
+ topic = manager->retrieve(topicName);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ try
+ {
+ topic = manager->create(topicName);
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ cerr << appName() << ": temporary failure. try again." << endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber");
+
+ //
+ // Add a servant for the Ice object. If --id is used the identity
+ // comes from the command line, otherwise a UUID is used.
+ //
+ // id is not directly altered since it is used below to detect
+ // whether subscribeAndGetPublisher can raise AlreadySubscribed.
+ //
+ Ice::Identity subId;
+ subId.name = id;
+ if(subId.name.empty())
+ {
+ subId.name = IceUtil::generateUUID();
+ }
+ Ice::ObjectPrx subscriber = adapter->add(new ClockI, subId);
+
+ IceStorm::QoS qos;
+ if(!retryCount.empty())
+ {
+ qos["retryCount"] = retryCount;
+ }
+
+ //
+ // Set up the proxy.
+ //
+ if(option == Datagram)
+ {
+ if(batch)
+ {
+ subscriber = subscriber->ice_batchDatagram();
+ }
+ else
+ {
+ subscriber = subscriber->ice_datagram();
+ }
+ }
+ else if(option == Twoway)
+ {
+ // Do nothing to the subscriber proxy. Its already twoway.
+ }
+ else if(option == Ordered)
+ {
+ // Do nothing to the subscriber proxy. Its already twoway.
+ qos["reliability"] = "ordered";
+ }
+ else if(option == Oneway || option == None)
+ {
+ if(batch)
+ {
+ subscriber = subscriber->ice_batchOneway();
+ }
+ else
+ {
+ subscriber = subscriber->ice_oneway();
+ }
+ }
+
+ try
+ {
+ topic->subscribeAndGetPublisher(qos, subscriber);
+ }
+ catch(const IceStorm::AlreadySubscribed&)
+ {
+ // If we're manually setting the subscriber id ignore.
+ if(id.empty())
+ {
+ throw;
+ }
+ cout << "reactivating persistent subscriber" << endl;
+ }
+ adapter->activate();
+
+ shutdownOnInterrupt();
+ communicator()->waitForShutdown();
+
+ topic->unsubscribe(subscriber);
+
+ return EXIT_SUCCESS;
+}