summaryrefslogtreecommitdiff
path: root/cpp/demo/IceStorm/replicated/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/demo/IceStorm/replicated/Subscriber.cpp')
-rw-r--r--cpp/demo/IceStorm/replicated/Subscriber.cpp369
1 files changed, 185 insertions, 184 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp
index 7a0848c76a4..9754af70bcd 100644
--- a/cpp/demo/IceStorm/replicated/Subscriber.cpp
+++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp
@@ -10,128 +10,12 @@
#include <IceUtil/IceUtil.h>
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
-#include <IceGrid/Query.h>
#include <Clock.h>
-#include <set>
-
using namespace std;
using namespace Demo;
-class InstanceCheckThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
-{
-public:
-
- InstanceCheckThread(const IceGrid::QueryPrx& query, const string& topicName, const Ice::ObjectPrx& clock,
- const Ice::ObjectPrx& managerReplica, const set<IceStorm::TopicManagerPrx>& managers,
- const Ice::ObjectPrx& topicReplica, const set<IceStorm::TopicPrx>& topics) :
- _query(query),
- _topicName(topicName),
- _clock(clock),
- _managerReplica(managerReplica),
- _managers(managers),
- _topicReplica(topicReplica),
- _topics(topics),
- _timeout(IceUtil::Time::seconds(10)),
- _terminated(false)
- {
- }
-
- virtual void
- run()
- {
- Lock sync(*this);
- while(!_terminated)
- {
- timedWait(_timeout);
- if(!_terminated)
- {
- try
- {
- //
- // Check if there are any new topic managers in the replicas list.
- //
- Ice::ObjectProxySeq managers = _query->findAllReplicas(_managerReplica);
- Ice::ObjectProxySeq::const_iterator p;
- for(p = managers.begin(); p != managers.end(); ++p)
- {
- IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(*p);
- if(_managers.find(manager) == _managers.end())
- {
- //
- // Create the topic on the new topic manager if it does
- // not already exist.
- //
- try
- {
- manager->retrieve(_topicName);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- manager->create(_topicName);
- }
-
- //
- // Since the topic proxy returned by the retrieval/creation
- // is a replicated proxy we cannot use it to subscribe to the
- // new instance. Instead we have to retrive all the topics and
- // find the new one.
- //
- Ice::ObjectProxySeq topics = _query->findAllReplicas(_topicReplica);
- Ice::ObjectProxySeq::const_iterator q;
- for(q = topics.begin(); q != topics.end(); ++q)
- {
- IceStorm::TopicPrx topic = IceStorm::TopicPrx::uncheckedCast(*q);;
- if(_topics.find(topic) == _topics.end())
- {
- try
- {
- topic->subscribeAndGetPublisher(IceStorm::QoS(), _clock);
- }
- catch(const IceStorm::AlreadySubscribed&)
- {
- // Ignore
- }
- _topics.insert(topic);
- _managers.insert(manager);
- break;
- }
- }
- }
- }
- }
- catch(const Ice::Exception& ex)
- {
- cerr << "warning: exception while checking for new IceStorm instances: " << ex << endl;
- }
- }
- }
- }
-
- void
- terminate()
- {
- Lock sync(*this);
- _terminated = true;
- notify();
- }
-
-private:
-
- const IceGrid::QueryPrx _query;
- const string _topicName;
- const Ice::ObjectPrx _clock;
- const Ice::ObjectPrx _managerReplica;
- set<IceStorm::TopicManagerPrx> _managers;
- const Ice::ObjectPrx _topicReplica;
- set<IceStorm::TopicPrx> _topics;
- const IceUtil::Time _timeout;
- bool _terminated;
-};
-
-typedef IceUtil::Handle<InstanceCheckThread> InstanceCheckThreadPtr;
-
class ClockI : public Clock
{
public:
@@ -157,109 +41,226 @@ main(int argc, char* argv[])
return app.main(argc, argv, "config.sub");
}
+void
+usage(const string& n)
+{
+ cerr << "Usage: " << n
+ << " [--batch] [--datagram|--twoway|--ordered|--oneway] [--retryCount count] [--id id] [topic]" << endl;
+}
+
int
Subscriber::run(int argc, char* argv[])
{
- if(argc > 2)
+ Ice::StringSeq args = Ice::argsToStringSeq(argc, argv);
+ args = communicator()->getProperties()->parseCommandLineOptions("Clock", args);
+ Ice::stringSeqToArgs(args, argc, argv);
+
+ bool batch = false;
+ enum Option { None, Datagram, Twoway, Oneway, Ordered};
+ Option option = None;
+ string topicName = "time";
+ string id;
+ string retryCount;
+ int i;
+
+ for(i = 1; i < argc; ++i)
{
- cerr << appName() << ": too many arguments" << endl;
- return EXIT_FAILURE;
- }
+ string optionString = argv[i];
+ Option oldoption = option;
+ if(optionString == "--datagram")
+ {
+ option = Datagram;
+ }
+ else if(optionString == "--twoway")
+ {
+ option = Twoway;
+ }
+ else if(optionString == "--oneway")
+ {
+ option = Oneway;
+ }
+ else if(optionString == "--ordered")
+ {
+ option = Ordered;
+ }
+ else if(optionString == "--batch")
+ {
+ batch = true;
+ }
+ else if(optionString == "--id")
+ {
+ ++i;
+ if(i >= argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ id = argv[i];
+ }
+ else if(optionString == "--retryCount")
+ {
+ ++i;
+ if(i >= argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ retryCount = argv[i];
+ }
+ else if(optionString.substr(0, 2) == "--")
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ else
+ {
+ topicName = argv[i++];
+ break;
+ }
- Ice::PropertiesPtr properties = communicator()->getProperties();
+ if(oldoption != option && oldoption != None)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ }
- IceGrid::QueryPrx query = IceGrid::QueryPrx::uncheckedCast(communicator()->stringToProxy("DemoIceGrid/Query"));
- Ice::ObjectPrx managerReplica = communicator()->stringToProxy("DemoIceStorm/TopicManager");
- Ice::ObjectProxySeq objSeq = query->findAllReplicas(managerReplica);
- if(objSeq.size() == 0)
+ if(i != argc)
{
- cerr << appName() << ": no topic managers found, make sure application was deployed." << endl;
+ usage(argv[0]);
return EXIT_FAILURE;
}
- string topicName = "time";
- if(argc != 1)
+ if(!retryCount.empty())
{
- topicName = argv[1];
+ if(option == None)
+ {
+ option = Twoway;
+ }
+ else if(option != Twoway && option != Ordered)
+ {
+ cerr << argv[0] << ": retryCount requires a twoway proxy" << endl;
+ return EXIT_FAILURE;
+ }
}
- //
- // Create the servant to receive the events.
- //
- Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber");
-
- //
- // We want to use oneway batch messages.
- //
- Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway();
+ if(batch && (option == Twoway || option == Ordered))
+ {
+ cerr << argv[0] << ": batch can only be set with oneway or datagram" << endl;
+ return EXIT_FAILURE;
+ }
- //
- // Get all the topic managers and create the topic if necessary.
- //
- IceStorm::TopicPrx topicReplica;
- set<IceStorm::TopicManagerPrx> managers;
- Ice::ObjectProxySeq::const_iterator p;
- for(p = objSeq.begin(); p != objSeq.end(); ++p)
+ IceStorm::TopicManagerPrx manager;
+ try
+ {
+ manager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy("DemoIceStorm/TopicManager"));
+ if(!manager)
+ {
+ cerr << appName() << ": invalid proxy" << endl;
+ return EXIT_FAILURE;
+ }
+ }
+ catch(const Ice::NotRegisteredException&)
{
- IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(*p);
- managers.insert(manager);
+ cerr << appName() << ": no topic manager found, make sure application was deployed." << endl;
+ return EXIT_FAILURE;
+ }
+ IceStorm::TopicPrx topic;
+ try
+ {
+ topic = manager->retrieve(topicName);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
try
{
- topicReplica = manager->retrieve(topicName);
+ topic = manager->create(topicName);
}
- catch(const IceStorm::NoSuchTopic&)
+ catch(const IceStorm::TopicExists&)
{
- try
- {
- topicReplica = manager->create(topicName);
- }
- catch(const IceStorm::TopicExists&)
- {
- cerr << appName() << ": temporary failure. try again." << endl;
- return EXIT_FAILURE;
- }
+ cerr << appName() << ": temporary failure. try again." << endl;
+ return EXIT_FAILURE;
}
}
+ Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber");
+
//
- // Get all the topics and subscribe. We can't use the proxies returned by
- // the topic creation above because they are repicated proxies, and not
- // specific proxies for each individual topic.
+ // Add a servant for the Ice object. If --id is used the identity
+ // comes from the command line, otherwise a UUID is used.
//
- IceStorm::TopicPrx topic;
- objSeq = query->findAllReplicas(topicReplica);
- set<IceStorm::TopicPrx> topics;
- for(p = objSeq.begin(); p != objSeq.end(); ++p)
+ // id is not directly altered since it is used below to detect
+ // whether subscribeAndGetPublisher can raise AlreadySubscribed.
+ //
+ Ice::Identity subId;
+ subId.name = id;
+ if(subId.name.empty())
{
- topic = IceStorm::TopicPrx::uncheckedCast(*p);
- topic->subscribeAndGetPublisher(IceStorm::QoS(), clock);
- topics.insert(topic);
+ subId.name = IceUtil::generateUUID();
+ }
+ Ice::ObjectPrx subscriber = adapter->add(new ClockI, subId);
+
+ IceStorm::QoS qos;
+ if(!retryCount.empty())
+ {
+ qos["retryCount"] = retryCount;
}
//
- // Create and start thread to check for new IceStorm instances coming online.
+ // Set up the proxy.
//
- InstanceCheckThreadPtr instanceCheck =
- new InstanceCheckThread(query, topicName, clock, managerReplica, managers, topicReplica, topics);
- instanceCheck->start();
+ if(option == Datagram)
+ {
+ if(batch)
+ {
+ subscriber = subscriber->ice_batchDatagram();
+ }
+ else
+ {
+ subscriber = subscriber->ice_datagram();
+ }
+ }
+ else if(option == Twoway)
+ {
+ // Do nothing to the subscriber proxy. Its already twoway.
+ }
+ else if(option == Ordered)
+ {
+ // Do nothing to the subscriber proxy. Its already twoway.
+ qos["reliability"] = "ordered";
+ }
+ else if(option == Oneway || option == None)
+ {
+ if(batch)
+ {
+ subscriber = subscriber->ice_batchOneway();
+ }
+ else
+ {
+ subscriber = subscriber->ice_oneway();
+ }
+ }
+ try
+ {
+ topic->subscribeAndGetPublisher(qos, subscriber);
+ }
+ catch(const IceStorm::AlreadySubscribed&)
+ {
+ // If we're manually setting the subscriber id ignore.
+ if(id.empty())
+ {
+ throw;
+ }
+ cout << "reactivating persistent subscriber" << endl;
+ }
adapter->activate();
+
shutdownOnInterrupt();
communicator()->waitForShutdown();
- instanceCheck->terminate();
- instanceCheck->getThreadControl().join();
-
- //
- // Unsubscribe from all topics.
- //
- objSeq = query->findAllReplicas(topicReplica);
- for(p = objSeq.begin(); p != objSeq.end(); ++p)
- {
- topic = IceStorm::TopicPrx::uncheckedCast(*p);
- topic->unsubscribe(clock);
- }
+ topic->unsubscribe(subscriber);
return EXIT_SUCCESS;
}