summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-07-21 14:30:52 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-07-21 14:30:52 +0000
commit2379dcbd554ab5c1aac616b5585c99e3d742305e (patch)
treeb674e53966fba682aa1008855a26721bd910e023 /cpp/src/IceGrid/ReplicaCache.cpp
parentOptimized context to avoid copies (diff)
downloadice-2379dcbd554ab5c1aac616b5585c99e3d742305e.tar.bz2
ice-2379dcbd554ab5c1aac616b5585c99e3d742305e.tar.xz
ice-2379dcbd554ab5c1aac616b5585c99e3d742305e.zip
Initial implementation of the IceGrid registry replication.
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp213
1 files changed, 213 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
new file mode 100644
index 00000000000..5a9703eed24
--- /dev/null
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -0,0 +1,213 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/LoggerUtil.h>
+#include <IceGrid/ReplicaCache.h>
+#include <IceGrid/ReplicaSessionI.h>
+#include <IceGrid/Database.h>
+#include <IceGrid/Topics.h>
+
+using namespace std;
+using namespace IceGrid;
+
+ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator,
+ const IceStorm::TopicManagerPrx& topicManager,
+ const std::string& instanceName,
+ const Ice::ObjectPrx& clientProxy,
+ const Ice::ObjectPrx& serverProxy) :
+ _communicator(communicator),
+ _instanceName(instanceName),
+ _clientProxy(clientProxy),
+ _serverProxy(serverProxy)
+{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("NodeNotifier");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("NodeNotifier");
+ }
+
+ const_cast<IceStorm::TopicPrx&>(_topic) = t;
+ const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher());
+}
+
+void
+ReplicaCache::destroy()
+{
+ //
+ // TODO: XXX: Is this also really needed for replicas?!
+ //
+ _entries.clear();
+}
+
+void
+ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session, const DatabasePtr& database)
+{
+ Lock sync(*this);
+
+ if(getImpl(name))
+ {
+ throw ReplicaActiveException();
+ }
+
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "replica `" << name << "' up";
+ }
+
+ addImpl(name, new ReplicaEntry(name, session));
+
+ ObjectInfo info;
+ info.type = InternalRegistry::ice_staticId();
+ info.proxy = session->getProxy();
+ database->addObject(info, true);
+
+ Ice::ObjectPrx clientProxy = getClientProxy();
+ Ice::Identity id;
+ id.category = _instanceName;
+ id.name = "Query";
+ database->updateObject(clientProxy->ice_identity(id));
+
+ RegistryObserverTopicPtr topic = database->getRegistryObserverTopic();
+ if(topic)
+ {
+ topic->subscribe(session->getReplicaInfo().observer);
+ }
+
+ try
+ {
+ _nodes->replicaAdded(session->getProxy());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // TODO: XXX
+ }
+}
+
+void
+ReplicaCache::remove(const string& name, const DatabasePtr& database)
+{
+ Lock sync(*this);
+
+ ReplicaEntryPtr entry = removeImpl(name);
+ assert(entry);
+
+ try
+ {
+ _nodes->replicaRemoved(entry->getSession()->getProxy());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // TODO: XXX
+ }
+
+ RegistryObserverTopicPtr topic = database->getRegistryObserverTopic();
+ if(topic)
+ {
+ topic->unsubscribe(entry->getSession()->getReplicaInfo().observer);
+ }
+
+ database->removeObject(entry->getSession()->getProxy()->ice_getIdentity());
+
+ Ice::ObjectPrx clientProxy = getClientProxy();
+ Ice::Identity id;
+ id.category = _instanceName;
+ id.name = "Query";
+ database->updateObject(clientProxy->ice_identity(id));
+
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "replica `" << name << "' down";
+ }
+}
+
+void
+ReplicaCache::nodeAdded(const NodePrx& node)
+{
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ try
+ {
+ _topic->subscribe(qos, node);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // TODO: XXX
+ }
+}
+
+void
+ReplicaCache::nodeRemoved(const NodePrx& node)
+{
+ try
+ {
+ _topic->unsubscribe(node);
+ }
+ catch(const Ice::ConnectionRefusedException&)
+ {
+ // The replica is being shutdown.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // TODO: XXX
+ }
+}
+
+Ice::ObjectPrx
+ReplicaCache::getClientProxy() const
+{
+ Ice::EndpointSeq endpoints;
+
+ Ice::EndpointSeq endpts = _clientProxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ endpts = p->second->getSession()->getReplicaInfo().clientProxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+ }
+
+ return _clientProxy->ice_endpoints(endpoints);
+}
+
+Ice::ObjectPrx
+ReplicaCache::getServerProxy() const
+{
+ Ice::EndpointSeq endpoints;
+
+ Ice::EndpointSeq endpts = _serverProxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ endpts = p->second->getSession()->getReplicaInfo().serverProxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), endpts.begin(), endpts.end());
+ }
+
+ return _serverProxy->ice_endpoints(endpoints);
+}
+
+
+ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) :
+ _name(name),
+ _session(session)
+{
+}
+
+const ReplicaSessionIPtr&
+ReplicaEntry::getSession() const
+{
+ return _session;
+}
+