summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-08-28 17:56:33 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-08-28 17:56:33 +0200
commit08cb802bb321d6a92323792d4f0dbefe13d87032 (patch)
tree07a487d26275ce136e8dee0d12422d2afe2498b5 /cpp
parentmakedist.py fixes (diff)
downloadice-08cb802bb321d6a92323792d4f0dbefe13d87032.tar.bz2
ice-08cb802bb321d6a92323792d4f0dbefe13d87032.tar.xz
ice-08cb802bb321d6a92323792d4f0dbefe13d87032.zip
Fix for ICE-5357: improved discovery of replicas on node/slave startup
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceGrid/Makefile3
-rw-r--r--cpp/src/IceGrid/NodeCache.h2
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp84
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.h3
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp75
-rw-r--r--cpp/src/IceGrid/RegistryI.h4
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.cpp7
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp23
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.h2
-rw-r--r--cpp/src/IceGrid/SessionManager.h17
10 files changed, 163 insertions, 57 deletions
diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile
index 3e793abd6fe..61cca096eb8 100644
--- a/cpp/src/IceGrid/Makefile
+++ b/cpp/src/IceGrid/Makefile
@@ -31,7 +31,8 @@ COMMON_OBJS = Internal.o \
DescriptorBuilder.o \
TraceLevels.o \
FileCache.o \
- PlatformInfo.o
+ PlatformInfo.o \
+ SessionManager.o
NODE_OBJS = NodeI.o \
NodeServerAdminRouter.o \
diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h
index 97cb03143b0..997081bd344 100644
--- a/cpp/src/IceGrid/NodeCache.h
+++ b/cpp/src/IceGrid/NodeCache.h
@@ -32,7 +32,7 @@ typedef std::vector<ServerEntryPtr> ServerEntrySeq;
class ReplicaCache;
-class NodeEntry : public IceUtil::Monitor<IceUtil::RecMutex>
+class NodeEntry : private IceUtil::Monitor<IceUtil::RecMutex>
{
public:
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index de66d372d55..86f9988bba5 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -244,26 +244,14 @@ NodeSessionManager::create(const NodeIPtr& node)
Ice::CommunicatorPtr communicator = _node->getCommunicator();
assert(communicator->getDefaultLocator());
- Ice::ObjectPrx prx = communicator->getDefaultLocator();
//
- // Initialize the IceGrid::Query objects. The IceGrid::Query
- // interface is used to lookup the registry proxy in case it
- // becomes unavailable. Since replicas might not always have
- // an up to date registry proxy, we need to query all the
- // replicas.
+ // Initialize query objects from the default locator endpoints.
//
- Ice::EndpointSeq endpoints = prx->ice_getEndpoints();
- Ice::Identity id = prx->ice_getIdentity();
- id.name = "Query";
- QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id));
- for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- Ice::EndpointSeq singleEndpoint;
- singleEndpoint.push_back(*p);
- _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
- }
+ initQueryObjects(communicator->getDefaultLocator());
+ Ice::ObjectPrx prx = communicator->getDefaultLocator();
+ Ice::Identity id = prx->ice_getIdentity();
id.name = "InternalRegistry-Master";
_master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()));
@@ -559,14 +547,32 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
else
{
- vector<Ice::AsyncResultPtr> results;
- for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q)
+ vector<Ice::AsyncResultPtr> results1;
+ vector<Ice::AsyncResultPtr> results2;
+ vector<QueryPrx> queryObjects = findAllQueryObjects();
+
+ //
+ // Below we try to retrieve internal registry proxies either
+ // directly by querying for the internal registry type or
+ // indirectly by querying registry proxies.
+ //
+ // IceGrid registries <= 3.5.0 kept internal registry proxies
+ // while earlier version now keep registry proxies. Registry
+ // proxies have fixed endpoints (no random port) so they are
+ // more reliable.
+ //
+
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
{
- results.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId()));
+ results1.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId()));
+ }
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ {
+ results2.push_back((*q)->begin_findAllObjectsByType(Registry::ice_staticId()));
}
map<Ice::Identity, Ice::ObjectPrx> proxies;
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ for(vector<Ice::AsyncResultPtr>::const_iterator p = results1.begin(); p != results1.end(); ++p)
{
QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
if(isDestroyed())
@@ -579,14 +585,42 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p);
for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
{
- //
- // NOTE: We might override a good proxy here! We could improve this to make
- // sure that we don't override the proxy for replica N if that proxy was
- // obtained from replica N.
- //
proxies[(*q)->ice_getIdentity()] = *q;
}
}
+ catch(const Ice::LocalException& ex)
+ {
+ // IGNORE
+ }
+ }
+ for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p)
+ {
+ QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
+ if(isDestroyed())
+ {
+ return;
+ }
+
+ try
+ {
+ Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p);
+ for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
+ {
+ Ice::Identity id = (*q)->ice_getIdentity();
+ const string prefix("Registry-");
+ string::size_type pos = id.name.find(prefix);
+ if(pos == string::npos)
+ {
+ continue; // Ignore the master registry proxy.
+ }
+ id.name = "InternalRegistry-" + id.name.substr(prefix.size());
+
+ Ice::ObjectPrx prx = (*q)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq());
+ id.name = "Locator";
+ prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*q)->ice_identity(id)));
+ proxies[id] = prx;
+ }
+ }
catch(const Ice::LocalException&)
{
// IGNORE
diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h
index a1bb2c23a16..704b753d0a2 100644
--- a/cpp/src/IceGrid/NodeSessionManager.h
+++ b/cpp/src/IceGrid/NodeSessionManager.h
@@ -47,7 +47,7 @@ protected:
};
typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr;
-class NodeSessionManager : public IceUtil::Monitor<IceUtil::Mutex>
+class NodeSessionManager : public SessionManager
{
public:
@@ -125,7 +125,6 @@ private:
const NodeIPtr _node;
ThreadPtr _thread;
- std::vector<QueryPrx> _queryObjects;
InternalRegistryPrx _master;
bool _destroyed;
bool _activated;
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 6af0c5b0366..c0d16fb5d42 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -385,6 +385,10 @@ RegistryI::startImpl()
//
ObjectProxySeq proxies;
+ //
+ // Get proxies for nodes that we were connected with on last
+ // shutdown.
+ //
NodePrxSeq nodes;
proxies = _database->getInternalObjectsByType(Node::ice_staticId());
for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
@@ -392,11 +396,51 @@ RegistryI::startImpl()
nodes.push_back(NodePrx::uncheckedCast(*p));
}
- InternalRegistryPrxSeq replicas;
+ //
+ // Get proxies for slaves that we we connected with on last
+ // shutdown.
+ //
+ // We first get the internal registry proxies and then also check
+ // the public registry proxies. If we find public registry
+ // proxies, we use indirect proxies setup with a locator using the
+ // public proxy in preference over the internal proxy which might
+ // contain stale endpoints if the slave was restarted. IceGrid
+ // version <= 3.5.0 also kept the internal proxy in the database
+ // instead of the public proxy.
+ //
+ map<InternalRegistryPrx, RegistryPrx> replicas;
proxies = _database->getObjectsByType(InternalRegistry::ice_staticId());
for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
{
- replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
+ replicas.insert(pair<InternalRegistryPrx, RegistryPrx>(InternalRegistryPrx::uncheckedCast(*p), RegistryPrx()));
+ }
+
+ proxies = _database->getObjectsByType(Registry::ice_staticId());
+ for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ Ice::Identity id = (*p)->ice_getIdentity();
+ const string prefix("Registry-");
+ string::size_type pos = id.name.find(prefix);
+ if(pos == string::npos)
+ {
+ continue; // Ignore the master registry proxy.
+ }
+ id.name = "InternalRegistry-" + id.name.substr(prefix.size());
+
+ Ice::ObjectPrx prx = (*p)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq());
+ id.name = "Locator";
+ prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*p)->ice_identity(id)));
+
+ for(map<InternalRegistryPrx, RegistryPrx>::iterator q = replicas.begin(); q != replicas.end(); ++q)
+ {
+ if(q->first->ice_getIdentity() == prx->ice_getIdentity())
+ {
+ replicas.erase(q);
+ break;
+ }
+ }
+ replicas.insert(pair<InternalRegistryPrx, RegistryPrx>(InternalRegistryPrx::uncheckedCast(prx),
+ RegistryPrx::uncheckedCast(*p)));
}
//
@@ -1298,17 +1342,17 @@ RegistryI::getSSLInfo(const ConnectionPtr& connection, string& userDN)
NodePrxSeq
RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry,
- const InternalRegistryPrxSeq& replicas,
+ const map<InternalRegistryPrx, RegistryPrx>& replicas,
const NodePrxSeq& dbNodes)
{
set<NodePrx> nodes;
nodes.insert(dbNodes.begin(), dbNodes.end());
vector<Ice::AsyncResultPtr> results;
- for(InternalRegistryPrxSeq::const_iterator r = replicas.begin(); r != replicas.end(); ++r)
+ for(map<InternalRegistryPrx, RegistryPrx>::const_iterator r = replicas.begin(); r != replicas.end(); ++r)
{
- if((*r)->ice_getIdentity() != internalRegistry->ice_getIdentity())
+ if(r->first->ice_getIdentity() != internalRegistry->ice_getIdentity())
{
- results.push_back((*r)->begin_registerWithReplica(internalRegistry));
+ results.push_back(r->first->begin_registerWithReplica(internalRegistry));
}
}
@@ -1351,6 +1395,25 @@ RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry,
// Clear the proxy from the database if we can't
// contact the replica.
//
+ RegistryPrx registry;
+ for(map<InternalRegistryPrx, RegistryPrx>::const_iterator q = replicas.begin(); q != replicas.end(); ++q)
+ {
+ if(q->first->ice_getIdentity() == replica->ice_getIdentity())
+ {
+ registry = q->second;
+ break;
+ }
+ }
+ if(registry)
+ {
+ try
+ {
+ _database->removeObject(registry->ice_getIdentity());
+ }
+ catch(const ObjectNotRegisteredException&)
+ {
+ }
+ }
try
{
_database->removeObject(replica->ice_getIdentity());
diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h
index 8595536e48c..74b1fc835cb 100644
--- a/cpp/src/IceGrid/RegistryI.h
+++ b/cpp/src/IceGrid/RegistryI.h
@@ -95,7 +95,9 @@ private:
Glacier2::SSLPermissionsVerifierPrx getSSLPermissionsVerifier(const LocatorPrx&, const std::string&);
Glacier2::SSLInfo getSSLInfo(const Ice::ConnectionPtr&, std::string&);
- NodePrxSeq registerReplicas(const InternalRegistryPrx&, const InternalRegistryPrxSeq&, const NodePrxSeq&);
+ NodePrxSeq registerReplicas(const InternalRegistryPrx&,
+ const std::map<InternalRegistryPrx, RegistryPrx>&,
+ const NodePrxSeq&);
void registerNodes(const InternalRegistryPrx&, const NodePrxSeq&);
const Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp
index a96f71a1040..51cb70eebf3 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionI.cpp
@@ -270,12 +270,15 @@ ReplicaSessionI::destroyImpl(bool shutdown)
_database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _info->name);
}
+ // Don't remove the replica proxy from the database if the registry is being shutdown.
if(!_replicaWellKnownObjects.empty())
{
if(shutdown) // Don't remove the replica proxy from the database if the registry is being shutdown.
{
- ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(),
- _internalRegistry->ice_getIdentity());
+ Ice::Identity id;
+ id.category = _internalRegistry->ice_getIdentity().category;
+ id.name = "Registry-" + _info->name;
+ ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(), id);
if(p != _replicaWellKnownObjects.end())
{
_replicaWellKnownObjects.erase(p);
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index 22159f57b3a..a706df8a4f4 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -272,9 +272,7 @@ ReplicaSessionManager::create(const string& name,
Lock sync(*this);
Ice::ObjectPrx prx = comm->getDefaultLocator();
-
- Ice::Identity id;
- id.category = prx->ice_getIdentity().category;
+ Ice::Identity id = prx->ice_getIdentity();
id.name = "InternalRegistry-Master";
_master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()));
@@ -286,21 +284,9 @@ ReplicaSessionManager::create(const string& name,
_traceLevels = _database->getTraceLevels();
//
- // Initialize the IceGrid::Query objects. The IceGrid::Query
- // interface is used to lookup the registry proxy in case it
- // becomes unavailable. Since replicas might not always have
- // an up to date registry proxy, we need to query all the
- // replicas.
+ // Initialize query objects from the default locator endpoints.
//
- Ice::EndpointSeq endpoints = prx->ice_getEndpoints();
- id.name = "Query";
- QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id));
- for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- Ice::EndpointSeq singleEndpoint;
- singleEndpoint.push_back(*p);
- _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
- }
+ initQueryObjects(comm->getDefaultLocator());
_thread = new Thread(*this, _master, _traceLevels->logger);
_thread->start();
@@ -458,7 +444,8 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
if(!session)
{
vector<Ice::AsyncResultPtr> results;
- for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q)
+ vector<QueryPrx> queryObjects = findAllQueryObjects();
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
{
results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity()));
}
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h
index b8b2e64bc91..1f906d8bf5f 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.h
+++ b/cpp/src/IceGrid/ReplicaSessionManager.h
@@ -29,7 +29,7 @@ typedef IceUtil::Handle<WellKnownObjectsManager> WellKnownObjectsManagerPtr;
class TraceLevels;
typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr;
-class ReplicaSessionManager : public IceUtil::Monitor<IceUtil::Mutex>
+class ReplicaSessionManager : public SessionManager
{
public:
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index 01286b665af..3547da3d04f 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -313,6 +313,23 @@ protected:
Action _nextAction;
};
+class SessionManager : public IceUtil::Monitor<IceUtil::Mutex>
+{
+public:
+
+ SessionManager();
+ virtual ~SessionManager();
+
+ virtual bool isDestroyed() = 0;
+
+protected:
+
+ void initQueryObjects(const Ice::LocatorPrx&);
+ std::vector<IceGrid::QueryPrx> findAllQueryObjects();
+
+ std::vector<IceGrid::QueryPrx> _queryObjects;
+};
+
};
#endif