diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2007-06-25 10:01:00 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2007-06-25 10:01:00 -0230 |
commit | 8e501f629d2b85998d767b815969d9cf73256d82 (patch) | |
tree | d17c90fd83d0dd8a845e0bdf43fba1254fa5401c /cpp/demo/IceStorm/replicated/Subscriber.cpp | |
parent | Fixed some errors in PropertyNames.xml (diff) | |
download | ice-8e501f629d2b85998d767b815969d9cf73256d82.tar.bz2 ice-8e501f629d2b85998d767b815969d9cf73256d82.tar.xz ice-8e501f629d2b85998d767b815969d9cf73256d82.zip |
IceStorm/replicated demo now recognizes when new IceStorm instance scome online
Diffstat (limited to 'cpp/demo/IceStorm/replicated/Subscriber.cpp')
-rw-r--r-- | cpp/demo/IceStorm/replicated/Subscriber.cpp | 163 |
1 files changed, 149 insertions, 14 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp index e7334f957ab..0629078c7b0 100644 --- a/cpp/demo/IceStorm/replicated/Subscriber.cpp +++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp @@ -14,11 +14,124 @@ #include <Clock.h> -#include <map> +#include <set> using namespace std; using namespace Demo; +class InstanceCheckThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> +{ +public: + + InstanceCheckThread(const IceGrid::QueryPrx& query, const string& topicName, const Ice::ObjectPrx& clock, + const Ice::ObjectPrx& managerReplica, const set<IceStorm::TopicManagerPrx>& managers, + const Ice::ObjectPrx& topicReplica, const set<IceStorm::TopicPrx>& topics) : + _query(query), + _topicName(topicName), + _clock(clock), + _managerReplica(managerReplica), + _managers(managers), + _topicReplica(topicReplica), + _topics(topics), + _timeout(IceUtil::Time::seconds(10)), + _terminated(false) + { + } + + virtual void + run() + { + Lock sync(*this); + while(!_terminated) + { + timedWait(_timeout); + if(!_terminated) + { + try + { + // + // Check if there are any new topic managers in the replicas list. + // + Ice::ObjectProxySeq managers = _query->findAllReplicas(_managerReplica); + Ice::ObjectProxySeq::const_iterator p; + for(p = managers.begin(); p != managers.end(); ++p) + { + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(*p); + if(_managers.find(manager) == _managers.end()) + { + // + // Create the topic on the new topic manager if it does + // not already exist. + // + try + { + manager->retrieve(_topicName); + } + catch(const IceStorm::NoSuchTopic&) + { + manager->create(_topicName); + } + + // + // Since the topic proxy returned by the retrieval/creation + // is a replicated proxy we cannot use it to subscribe to the + // new instance. Instead we ahve to retrive all the topics and + // find the new one. + // + Ice::ObjectProxySeq topics = _query->findAllReplicas(_topicReplica); + Ice::ObjectProxySeq::const_iterator q; + for(q = topics.begin(); q != topics.end(); ++q) + { + IceStorm::TopicPrx topic = IceStorm::TopicPrx::uncheckedCast(*q);; + if(_topics.find(topic) == _topics.end()) + { + try + { + topic->subscribeAndGetPublisher(IceStorm::QoS(), _clock); + } + catch(const IceStorm::AlreadySubscribed& ex) + { + // Ignore + } + _topics.insert(topic); + _managers.insert(manager); + break; + } + } + } + } + } + catch(const Ice::Exception& ex) + { + cerr << "warning: exception while checking for new IceStorm instances: " << ex << endl; + } + } + } + } + + void + terminate() + { + Lock sync(*this); + _terminated = true; + notify(); + } + +private: + + const IceGrid::QueryPrx _query; + const string _topicName; + const Ice::ObjectPrx _clock; + const Ice::ObjectPrx _managerReplica; + set<IceStorm::TopicManagerPrx> _managers; + const Ice::ObjectPrx _topicReplica; + set<IceStorm::TopicPrx> _topics; + const IceUtil::Time _timeout; + bool _terminated; +}; + +typedef IceUtil::Handle<InstanceCheckThread> InstanceCheckThreadPtr; + class ClockI : public Clock { public: @@ -50,8 +163,9 @@ Subscriber::run(int argc, char* argv[]) Ice::PropertiesPtr properties = communicator()->getProperties(); IceGrid::QueryPrx query = IceGrid::QueryPrx::uncheckedCast(communicator()->stringToProxy("DemoIceGrid/Query")); - Ice::ObjectProxySeq managers = query->findAllReplicas(communicator()->stringToProxy("DemoIceStorm/TopicManager")); - if(managers.size() == 0) + Ice::ObjectPrx managerReplica = communicator()->stringToProxy("DemoIceStorm/TopicManager"); + Ice::ObjectProxySeq objSeq = query->findAllReplicas(managerReplica); + if(objSeq.size() == 0) { cerr << appName() << ": no topic managers found, make sure application was deployed." << endl; return EXIT_FAILURE; @@ -73,24 +187,26 @@ Subscriber::run(int argc, char* argv[]) // Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway(); - IceStorm::TopicPrx topic; + // + // Get all the topic managers and create the topic if necessary. + // + IceStorm::TopicPrx topicReplica; + set<IceStorm::TopicManagerPrx> managers; Ice::ObjectProxySeq::const_iterator p; - for(p = managers.begin(); p != managers.end(); ++p) + for(p = objSeq.begin(); p != objSeq.end(); ++p) { - // - // Add a Servant for the Ice Object. - // IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(*p); + managers.insert(manager); try { - topic = manager->retrieve(topicName); + topicReplica = manager->retrieve(topicName); } catch(const IceStorm::NoSuchTopic&) { try { - topic = manager->create(topicName); + topicReplica = manager->create(topicName); } catch(const IceStorm::TopicExists&) { @@ -100,21 +216,40 @@ Subscriber::run(int argc, char* argv[]) } } - Ice::ObjectProxySeq topics = query->findAllReplicas(topic); - for(p = topics.begin(); p != topics.end(); ++p) + // + // Get all the topics and subscribe. We can't use the proxies returned by + // the topic creation above because they are repicated proxies, and not + // specific proxies for each individual topic. + // + IceStorm::TopicPrx topic; + objSeq = query->findAllReplicas(topicReplica); + set<IceStorm::TopicPrx> topics; + for(p = objSeq.begin(); p != objSeq.end(); ++p) { topic = IceStorm::TopicPrx::uncheckedCast(*p); topic->subscribeAndGetPublisher(IceStorm::QoS(), clock); + topics.insert(topic); } + // + // Create and start thread to check for new IceStorm instances coming online. + // + InstanceCheckThreadPtr instanceCheck = + new InstanceCheckThread(query, topicName, clock, managerReplica, managers, topicReplica, topics); + instanceCheck->start(); + adapter->activate(); shutdownOnInterrupt(); communicator()->waitForShutdown(); + instanceCheck->terminate(); + instanceCheck->getThreadControl().join(); + // - // Unsubscribe all subscribed objects. + // Unsubscribe from all topics. // - for(p = topics.begin(); p != topics.end(); ++p) + objSeq = query->findAllReplicas(topicReplica); + for(p = objSeq.begin(); p != objSeq.end(); ++p) { topic = IceStorm::TopicPrx::uncheckedCast(*p); topic->unsubscribe(clock); |