diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-08-28 17:56:33 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-08-28 17:56:33 +0200 |
commit | 08cb802bb321d6a92323792d4f0dbefe13d87032 (patch) | |
tree | 07a487d26275ce136e8dee0d12422d2afe2498b5 /cpp/src/IceGrid | |
parent | makedist.py fixes (diff) | |
download | ice-08cb802bb321d6a92323792d4f0dbefe13d87032.tar.bz2 ice-08cb802bb321d6a92323792d4f0dbefe13d87032.tar.xz ice-08cb802bb321d6a92323792d4f0dbefe13d87032.zip |
Fix for ICE-5357: improved discovery of replicas on node/slave startup
Diffstat (limited to 'cpp/src/IceGrid')
-rw-r--r-- | cpp/src/IceGrid/Makefile | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 84 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 75 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.cpp | 7 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 23 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 17 |
10 files changed, 163 insertions, 57 deletions
diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile index 3e793abd6fe..61cca096eb8 100644 --- a/cpp/src/IceGrid/Makefile +++ b/cpp/src/IceGrid/Makefile @@ -31,7 +31,8 @@ COMMON_OBJS = Internal.o \ DescriptorBuilder.o \ TraceLevels.o \ FileCache.o \ - PlatformInfo.o + PlatformInfo.o \ + SessionManager.o NODE_OBJS = NodeI.o \ NodeServerAdminRouter.o \ diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h index 97cb03143b0..997081bd344 100644 --- a/cpp/src/IceGrid/NodeCache.h +++ b/cpp/src/IceGrid/NodeCache.h @@ -32,7 +32,7 @@ typedef std::vector<ServerEntryPtr> ServerEntrySeq; class ReplicaCache; -class NodeEntry : public IceUtil::Monitor<IceUtil::RecMutex> +class NodeEntry : private IceUtil::Monitor<IceUtil::RecMutex> { public: diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index de66d372d55..86f9988bba5 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -244,26 +244,14 @@ NodeSessionManager::create(const NodeIPtr& node) Ice::CommunicatorPtr communicator = _node->getCommunicator(); assert(communicator->getDefaultLocator()); - Ice::ObjectPrx prx = communicator->getDefaultLocator(); // - // Initialize the IceGrid::Query objects. The IceGrid::Query - // interface is used to lookup the registry proxy in case it - // becomes unavailable. Since replicas might not always have - // an up to date registry proxy, we need to query all the - // replicas. + // Initialize query objects from the default locator endpoints. // - Ice::EndpointSeq endpoints = prx->ice_getEndpoints(); - Ice::Identity id = prx->ice_getIdentity(); - id.name = "Query"; - QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id)); - for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - Ice::EndpointSeq singleEndpoint; - singleEndpoint.push_back(*p); - _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); - } + initQueryObjects(communicator->getDefaultLocator()); + Ice::ObjectPrx prx = communicator->getDefaultLocator(); + Ice::Identity id = prx->ice_getIdentity(); id.name = "InternalRegistry-Master"; _master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq())); @@ -559,14 +547,32 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } else { - vector<Ice::AsyncResultPtr> results; - for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + vector<Ice::AsyncResultPtr> results1; + vector<Ice::AsyncResultPtr> results2; + vector<QueryPrx> queryObjects = findAllQueryObjects(); + + // + // Below we try to retrieve internal registry proxies either + // directly by querying for the internal registry type or + // indirectly by querying registry proxies. + // + // IceGrid registries <= 3.5.0 kept internal registry proxies + // while earlier version now keep registry proxies. Registry + // proxies have fixed endpoints (no random port) so they are + // more reliable. + // + + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) { - results.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId())); + results1.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId())); + } + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + { + results2.push_back((*q)->begin_findAllObjectsByType(Registry::ice_staticId())); } map<Ice::Identity, Ice::ObjectPrx> proxies; - for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + for(vector<Ice::AsyncResultPtr>::const_iterator p = results1.begin(); p != results1.end(); ++p) { QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); if(isDestroyed()) @@ -579,14 +585,42 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p); for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) { - // - // NOTE: We might override a good proxy here! We could improve this to make - // sure that we don't override the proxy for replica N if that proxy was - // obtained from replica N. - // proxies[(*q)->ice_getIdentity()] = *q; } } + catch(const Ice::LocalException& ex) + { + // IGNORE + } + } + for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p) + { + QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); + if(isDestroyed()) + { + return; + } + + try + { + Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p); + for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) + { + Ice::Identity id = (*q)->ice_getIdentity(); + const string prefix("Registry-"); + string::size_type pos = id.name.find(prefix); + if(pos == string::npos) + { + continue; // Ignore the master registry proxy. + } + id.name = "InternalRegistry-" + id.name.substr(prefix.size()); + + Ice::ObjectPrx prx = (*q)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()); + id.name = "Locator"; + prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*q)->ice_identity(id))); + proxies[id] = prx; + } + } catch(const Ice::LocalException&) { // IGNORE diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index a1bb2c23a16..704b753d0a2 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -47,7 +47,7 @@ protected: }; typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr; -class NodeSessionManager : public IceUtil::Monitor<IceUtil::Mutex> +class NodeSessionManager : public SessionManager { public: @@ -125,7 +125,6 @@ private: const NodeIPtr _node; ThreadPtr _thread; - std::vector<QueryPrx> _queryObjects; InternalRegistryPrx _master; bool _destroyed; bool _activated; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 6af0c5b0366..c0d16fb5d42 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -385,6 +385,10 @@ RegistryI::startImpl() // ObjectProxySeq proxies; + // + // Get proxies for nodes that we were connected with on last + // shutdown. + // NodePrxSeq nodes; proxies = _database->getInternalObjectsByType(Node::ice_staticId()); for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) @@ -392,11 +396,51 @@ RegistryI::startImpl() nodes.push_back(NodePrx::uncheckedCast(*p)); } - InternalRegistryPrxSeq replicas; + // + // Get proxies for slaves that we we connected with on last + // shutdown. + // + // We first get the internal registry proxies and then also check + // the public registry proxies. If we find public registry + // proxies, we use indirect proxies setup with a locator using the + // public proxy in preference over the internal proxy which might + // contain stale endpoints if the slave was restarted. IceGrid + // version <= 3.5.0 also kept the internal proxy in the database + // instead of the public proxy. + // + map<InternalRegistryPrx, RegistryPrx> replicas; proxies = _database->getObjectsByType(InternalRegistry::ice_staticId()); for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) { - replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + replicas.insert(pair<InternalRegistryPrx, RegistryPrx>(InternalRegistryPrx::uncheckedCast(*p), RegistryPrx())); + } + + proxies = _database->getObjectsByType(Registry::ice_staticId()); + for(ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + Ice::Identity id = (*p)->ice_getIdentity(); + const string prefix("Registry-"); + string::size_type pos = id.name.find(prefix); + if(pos == string::npos) + { + continue; // Ignore the master registry proxy. + } + id.name = "InternalRegistry-" + id.name.substr(prefix.size()); + + Ice::ObjectPrx prx = (*p)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()); + id.name = "Locator"; + prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*p)->ice_identity(id))); + + for(map<InternalRegistryPrx, RegistryPrx>::iterator q = replicas.begin(); q != replicas.end(); ++q) + { + if(q->first->ice_getIdentity() == prx->ice_getIdentity()) + { + replicas.erase(q); + break; + } + } + replicas.insert(pair<InternalRegistryPrx, RegistryPrx>(InternalRegistryPrx::uncheckedCast(prx), + RegistryPrx::uncheckedCast(*p))); } // @@ -1298,17 +1342,17 @@ RegistryI::getSSLInfo(const ConnectionPtr& connection, string& userDN) NodePrxSeq RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry, - const InternalRegistryPrxSeq& replicas, + const map<InternalRegistryPrx, RegistryPrx>& replicas, const NodePrxSeq& dbNodes) { set<NodePrx> nodes; nodes.insert(dbNodes.begin(), dbNodes.end()); vector<Ice::AsyncResultPtr> results; - for(InternalRegistryPrxSeq::const_iterator r = replicas.begin(); r != replicas.end(); ++r) + for(map<InternalRegistryPrx, RegistryPrx>::const_iterator r = replicas.begin(); r != replicas.end(); ++r) { - if((*r)->ice_getIdentity() != internalRegistry->ice_getIdentity()) + if(r->first->ice_getIdentity() != internalRegistry->ice_getIdentity()) { - results.push_back((*r)->begin_registerWithReplica(internalRegistry)); + results.push_back(r->first->begin_registerWithReplica(internalRegistry)); } } @@ -1351,6 +1395,25 @@ RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry, // Clear the proxy from the database if we can't // contact the replica. // + RegistryPrx registry; + for(map<InternalRegistryPrx, RegistryPrx>::const_iterator q = replicas.begin(); q != replicas.end(); ++q) + { + if(q->first->ice_getIdentity() == replica->ice_getIdentity()) + { + registry = q->second; + break; + } + } + if(registry) + { + try + { + _database->removeObject(registry->ice_getIdentity()); + } + catch(const ObjectNotRegisteredException&) + { + } + } try { _database->removeObject(replica->ice_getIdentity()); diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h index 8595536e48c..74b1fc835cb 100644 --- a/cpp/src/IceGrid/RegistryI.h +++ b/cpp/src/IceGrid/RegistryI.h @@ -95,7 +95,9 @@ private: Glacier2::SSLPermissionsVerifierPrx getSSLPermissionsVerifier(const LocatorPrx&, const std::string&); Glacier2::SSLInfo getSSLInfo(const Ice::ConnectionPtr&, std::string&); - NodePrxSeq registerReplicas(const InternalRegistryPrx&, const InternalRegistryPrxSeq&, const NodePrxSeq&); + NodePrxSeq registerReplicas(const InternalRegistryPrx&, + const std::map<InternalRegistryPrx, RegistryPrx>&, + const NodePrxSeq&); void registerNodes(const InternalRegistryPrx&, const NodePrxSeq&); const Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index a96f71a1040..51cb70eebf3 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -270,12 +270,15 @@ ReplicaSessionI::destroyImpl(bool shutdown) _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _info->name); } + // Don't remove the replica proxy from the database if the registry is being shutdown. if(!_replicaWellKnownObjects.empty()) { if(shutdown) // Don't remove the replica proxy from the database if the registry is being shutdown. { - ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(), - _internalRegistry->ice_getIdentity()); + Ice::Identity id; + id.category = _internalRegistry->ice_getIdentity().category; + id.name = "Registry-" + _info->name; + ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(), id); if(p != _replicaWellKnownObjects.end()) { _replicaWellKnownObjects.erase(p); diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 22159f57b3a..a706df8a4f4 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -272,9 +272,7 @@ ReplicaSessionManager::create(const string& name, Lock sync(*this); Ice::ObjectPrx prx = comm->getDefaultLocator(); - - Ice::Identity id; - id.category = prx->ice_getIdentity().category; + Ice::Identity id = prx->ice_getIdentity(); id.name = "InternalRegistry-Master"; _master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq())); @@ -286,21 +284,9 @@ ReplicaSessionManager::create(const string& name, _traceLevels = _database->getTraceLevels(); // - // Initialize the IceGrid::Query objects. The IceGrid::Query - // interface is used to lookup the registry proxy in case it - // becomes unavailable. Since replicas might not always have - // an up to date registry proxy, we need to query all the - // replicas. + // Initialize query objects from the default locator endpoints. // - Ice::EndpointSeq endpoints = prx->ice_getEndpoints(); - id.name = "Query"; - QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id)); - for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - Ice::EndpointSeq singleEndpoint; - singleEndpoint.push_back(*p); - _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); - } + initQueryObjects(comm->getDefaultLocator()); _thread = new Thread(*this, _master, _traceLevels->logger); _thread->start(); @@ -458,7 +444,8 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim if(!session) { vector<Ice::AsyncResultPtr> results; - for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + vector<QueryPrx> queryObjects = findAllQueryObjects(); + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) { results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); } diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index b8b2e64bc91..1f906d8bf5f 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -29,7 +29,7 @@ typedef IceUtil::Handle<WellKnownObjectsManager> WellKnownObjectsManagerPtr; class TraceLevels; typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; -class ReplicaSessionManager : public IceUtil::Monitor<IceUtil::Mutex> +class ReplicaSessionManager : public SessionManager { public: diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index 01286b665af..3547da3d04f 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -313,6 +313,23 @@ protected: Action _nextAction; }; +class SessionManager : public IceUtil::Monitor<IceUtil::Mutex> +{ +public: + + SessionManager(); + virtual ~SessionManager(); + + virtual bool isDestroyed() = 0; + +protected: + + void initQueryObjects(const Ice::LocatorPrx&); + std::vector<IceGrid::QueryPrx> findAllQueryObjects(); + + std::vector<IceGrid::QueryPrx> _queryObjects; +}; + }; #endif |