summaryrefslogtreecommitdiff
path: root/cpp/demo/IceStorm/replicated/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/demo/IceStorm/replicated/Subscriber.cpp')
-rw-r--r--cpp/demo/IceStorm/replicated/Subscriber.cpp163
1 files changed, 149 insertions, 14 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp
index e7334f957ab..0629078c7b0 100644
--- a/cpp/demo/IceStorm/replicated/Subscriber.cpp
+++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp
@@ -14,11 +14,124 @@
#include <Clock.h>
-#include <map>
+#include <set>
using namespace std;
using namespace Demo;
+class InstanceCheckThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
+{
+public:
+
+ InstanceCheckThread(const IceGrid::QueryPrx& query, const string& topicName, const Ice::ObjectPrx& clock,
+ const Ice::ObjectPrx& managerReplica, const set<IceStorm::TopicManagerPrx>& managers,
+ const Ice::ObjectPrx& topicReplica, const set<IceStorm::TopicPrx>& topics) :
+ _query(query),
+ _topicName(topicName),
+ _clock(clock),
+ _managerReplica(managerReplica),
+ _managers(managers),
+ _topicReplica(topicReplica),
+ _topics(topics),
+ _timeout(IceUtil::Time::seconds(10)),
+ _terminated(false)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ Lock sync(*this);
+ while(!_terminated)
+ {
+ timedWait(_timeout);
+ if(!_terminated)
+ {
+ try
+ {
+ //
+ // Check if there are any new topic managers in the replicas list.
+ //
+ Ice::ObjectProxySeq managers = _query->findAllReplicas(_managerReplica);
+ Ice::ObjectProxySeq::const_iterator p;
+ for(p = managers.begin(); p != managers.end(); ++p)
+ {
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(*p);
+ if(_managers.find(manager) == _managers.end())
+ {
+ //
+ // Create the topic on the new topic manager if it does
+ // not already exist.
+ //
+ try
+ {
+ manager->retrieve(_topicName);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ manager->create(_topicName);
+ }
+
+ //
+ // Since the topic proxy returned by the retrieval/creation
+ // is a replicated proxy we cannot use it to subscribe to the
+ // new instance. Instead we ahve to retrive all the topics and
+ // find the new one.
+ //
+ Ice::ObjectProxySeq topics = _query->findAllReplicas(_topicReplica);
+ Ice::ObjectProxySeq::const_iterator q;
+ for(q = topics.begin(); q != topics.end(); ++q)
+ {
+ IceStorm::TopicPrx topic = IceStorm::TopicPrx::uncheckedCast(*q);;
+ if(_topics.find(topic) == _topics.end())
+ {
+ try
+ {
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), _clock);
+ }
+ catch(const IceStorm::AlreadySubscribed& ex)
+ {
+ // Ignore
+ }
+ _topics.insert(topic);
+ _managers.insert(manager);
+ break;
+ }
+ }
+ }
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ cerr << "warning: exception while checking for new IceStorm instances: " << ex << endl;
+ }
+ }
+ }
+ }
+
+ void
+ terminate()
+ {
+ Lock sync(*this);
+ _terminated = true;
+ notify();
+ }
+
+private:
+
+ const IceGrid::QueryPrx _query;
+ const string _topicName;
+ const Ice::ObjectPrx _clock;
+ const Ice::ObjectPrx _managerReplica;
+ set<IceStorm::TopicManagerPrx> _managers;
+ const Ice::ObjectPrx _topicReplica;
+ set<IceStorm::TopicPrx> _topics;
+ const IceUtil::Time _timeout;
+ bool _terminated;
+};
+
+typedef IceUtil::Handle<InstanceCheckThread> InstanceCheckThreadPtr;
+
class ClockI : public Clock
{
public:
@@ -50,8 +163,9 @@ Subscriber::run(int argc, char* argv[])
Ice::PropertiesPtr properties = communicator()->getProperties();
IceGrid::QueryPrx query = IceGrid::QueryPrx::uncheckedCast(communicator()->stringToProxy("DemoIceGrid/Query"));
- Ice::ObjectProxySeq managers = query->findAllReplicas(communicator()->stringToProxy("DemoIceStorm/TopicManager"));
- if(managers.size() == 0)
+ Ice::ObjectPrx managerReplica = communicator()->stringToProxy("DemoIceStorm/TopicManager");
+ Ice::ObjectProxySeq objSeq = query->findAllReplicas(managerReplica);
+ if(objSeq.size() == 0)
{
cerr << appName() << ": no topic managers found, make sure application was deployed." << endl;
return EXIT_FAILURE;
@@ -73,24 +187,26 @@ Subscriber::run(int argc, char* argv[])
//
Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway();
- IceStorm::TopicPrx topic;
+ //
+ // Get all the topic managers and create the topic if necessary.
+ //
+ IceStorm::TopicPrx topicReplica;
+ set<IceStorm::TopicManagerPrx> managers;
Ice::ObjectProxySeq::const_iterator p;
- for(p = managers.begin(); p != managers.end(); ++p)
+ for(p = objSeq.begin(); p != objSeq.end(); ++p)
{
- //
- // Add a Servant for the Ice Object.
- //
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(*p);
+ managers.insert(manager);
try
{
- topic = manager->retrieve(topicName);
+ topicReplica = manager->retrieve(topicName);
}
catch(const IceStorm::NoSuchTopic&)
{
try
{
- topic = manager->create(topicName);
+ topicReplica = manager->create(topicName);
}
catch(const IceStorm::TopicExists&)
{
@@ -100,21 +216,40 @@ Subscriber::run(int argc, char* argv[])
}
}
- Ice::ObjectProxySeq topics = query->findAllReplicas(topic);
- for(p = topics.begin(); p != topics.end(); ++p)
+ //
+ // Get all the topics and subscribe. We can't use the proxies returned by
+ // the topic creation above because they are repicated proxies, and not
+ // specific proxies for each individual topic.
+ //
+ IceStorm::TopicPrx topic;
+ objSeq = query->findAllReplicas(topicReplica);
+ set<IceStorm::TopicPrx> topics;
+ for(p = objSeq.begin(); p != objSeq.end(); ++p)
{
topic = IceStorm::TopicPrx::uncheckedCast(*p);
topic->subscribeAndGetPublisher(IceStorm::QoS(), clock);
+ topics.insert(topic);
}
+ //
+ // Create and start thread to check for new IceStorm instances coming online.
+ //
+ InstanceCheckThreadPtr instanceCheck =
+ new InstanceCheckThread(query, topicName, clock, managerReplica, managers, topicReplica, topics);
+ instanceCheck->start();
+
adapter->activate();
shutdownOnInterrupt();
communicator()->waitForShutdown();
+ instanceCheck->terminate();
+ instanceCheck->getThreadControl().join();
+
//
- // Unsubscribe all subscribed objects.
+ // Unsubscribe from all topics.
//
- for(p = topics.begin(); p != topics.end(); ++p)
+ objSeq = query->findAllReplicas(topicReplica);
+ for(p = objSeq.begin(); p != objSeq.end(); ++p)
{
topic = IceStorm::TopicPrx::uncheckedCast(*p);
topic->unsubscribe(clock);