summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaCache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp281
1 files changed, 281 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
new file mode 100644
index 00000000000..53ad03eee3a
--- /dev/null
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -0,0 +1,281 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Communicator.h>
+#include <Ice/LocalException.h>
+#include <Ice/LoggerUtil.h>
+#include <IceGrid/ReplicaCache.h>
+#include <IceGrid/ReplicaSessionI.h>
+#include <IceGrid/Topics.h>
+
+using namespace std;
+using namespace IceGrid;
+
+ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceStorm::TopicManagerPrx& topicManager) :
+ _communicator(communicator)
+{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("ReplicaObserverTopic");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("ReplicaObserverTopic");
+ }
+
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
+ const_cast<ReplicaObserverPrx&>(_observers) = ReplicaObserverPrx::uncheckedCast(_topic->getPublisher());
+}
+
+ReplicaEntryPtr
+ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
+{
+ Lock sync(*this);
+
+ ReplicaEntryPtr entry;
+ while(entry = getImpl(name))
+ {
+ ReplicaSessionIPtr session = entry->getSession();
+ if(session->isDestroyed())
+ {
+ wait(); // Wait for the session to be removed.
+ }
+ else
+ {
+ //
+ // Check if the replica is still reachable, if not, we
+ // destroy its session.
+ //
+ sync.release();
+ try
+ {
+ session->getInternalRegistry()->ice_ping();
+ throw ReplicaActiveException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ try
+ {
+ session->destroy();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ sync.acquire();
+ }
+ }
+
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "replica `" << name << "' up";
+ }
+
+ try
+ {
+ _observers->replicaAdded(session->getInternalRegistry());
+ }
+ catch(const Ice::ConnectionRefusedException&)
+ {
+ // Expected if the replica is being shutdown.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ TraceLevelsPtr traceLevels = getTraceLevels();
+ if(traceLevels)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << "unexpected exception while publishing `replicaAdded' update:\n" << ex;
+ }
+ }
+
+ return addImpl(name, new ReplicaEntry(name, session));
+}
+
+ReplicaEntryPtr
+ReplicaCache::remove(const string& name, bool shutdown)
+{
+ Lock sync(*this);
+
+ ReplicaEntryPtr entry = getImpl(name);
+ assert(entry);
+ removeImpl(name);
+ notifyAll();
+
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "replica `" << name << "' down";
+ }
+
+ if(!shutdown)
+ {
+ try
+ {
+ _observers->replicaRemoved(entry->getProxy());
+ }
+ catch(const Ice::ConnectionRefusedException&)
+ {
+ // Expected if the replica is being shutdown.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ TraceLevelsPtr traceLevels = getTraceLevels();
+ if(traceLevels)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << "unexpected exception while publishing `replicaRemoved' update:\n" << ex;
+ }
+ }
+ }
+
+ return entry;
+}
+
+ReplicaEntryPtr
+ReplicaCache::get(const string& name) const
+{
+ Lock sync(*this);
+ ReplicaEntryPtr entry = getImpl(name);
+ if(!entry)
+ {
+ RegistryNotExistException ex;
+ ex.name = name;
+ throw ex;
+ }
+ return entry;
+}
+
+void
+ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
+{
+ try
+ {
+ Lock sync(*this);
+ InternalRegistryPrxSeq replicas;
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ replicas.push_back(p->second->getProxy());
+ }
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+ Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway());
+ ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
+ }
+ catch(const Ice::ConnectionRefusedException&)
+ {
+ // The replica is being shutdown.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ TraceLevelsPtr traceLevels = getTraceLevels();
+ if(traceLevels)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << "unexpected exception while subscribing observer from replica observer topic:\n" << ex;
+ }
+ }
+}
+
+void
+ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer)
+{
+ try
+ {
+ _topic->unsubscribe(observer);
+ }
+ catch(const Ice::ConnectionRefusedException&)
+ {
+ // The replica is being shutdown.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ TraceLevelsPtr traceLevels = getTraceLevels();
+ if(traceLevels)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << "unexpected exception while unsubscribing observer from replica observer topic:\n" << ex;
+ }
+ }
+}
+
+Ice::ObjectPrx
+ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) const
+{
+ Ice::EndpointSeq endpoints;
+
+ if(proxy)
+ {
+ Ice::EndpointSeq endpts = proxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+ }
+
+ Lock sync(*this);
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ Ice::ObjectPrx prx = p->second->getSession()->getEndpoint(name);
+ if(prx)
+ {
+ Ice::EndpointSeq endpts = prx->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+ }
+ }
+
+ return _communicator->stringToProxy("dummy")->ice_endpoints(endpoints);
+}
+
+void
+ReplicaCache::setInternalRegistry(const InternalRegistryPrx& proxy)
+{
+ //
+ // Setup this replica internal registry proxy.
+ //
+ _self = proxy;
+}
+
+InternalRegistryPrx
+ReplicaCache::getInternalRegistry() const
+{
+ //
+ // This replica internal registry proxy.
+ //
+ return _self;
+}
+
+ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) :
+ _name(name),
+ _session(session)
+{
+}
+
+ReplicaEntry::~ReplicaEntry()
+{
+}
+
+const ReplicaSessionIPtr&
+ReplicaEntry::getSession() const
+{
+ return _session;
+}
+
+InternalReplicaInfoPtr
+ReplicaEntry::getInfo() const
+{
+ return _session->getInfo();
+}
+
+InternalRegistryPrx
+ReplicaEntry::getProxy() const
+{
+ return _session->getInternalRegistry();
+}
+