diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-10-17 13:52:27 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-10-17 13:52:27 +0000 |
commit | 9684b99fb9b1c531a180f60c17128ee7502b7c7a (patch) | |
tree | ee76c1a78e0484a68fa22a04851b411d7428ef7e /cpp/src/IceGrid/NodeSessionManager.cpp | |
parent | Changed error message in Makefile.mak (diff) | |
download | ice-9684b99fb9b1c531a180f60c17128ee7502b7c7a.tar.bz2 ice-9684b99fb9b1c531a180f60c17128ee7502b7c7a.tar.xz ice-9684b99fb9b1c531a180f60c17128ee7502b7c7a.zip |
Replication fixes, more work on the replication tests
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 89 |
1 files changed, 69 insertions, 20 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index c6742056400..bc22006450d 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -18,12 +18,12 @@ using namespace IceGrid; NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node, - const IceGrid::QueryPrx& query) : + const vector<QueryPrx>& queryObjects) : SessionKeepAliveThread<NodeSessionPrx>(registry), _node(node), - _query(query) + _queryObjects(queryObjects) { - assert(registry && node && query); + assert(registry && node && !_queryObjects.empty()); string name = registry->ice_getIdentity().name; const string prefix("InternalRegistry-"); string::size_type pos = name.find(prefix); @@ -48,6 +48,7 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I out << "trying to establish session with replica `" << _name << "'"; } + set<InternalRegistryPrx> used; if(!registry->ice_getEndpoints().empty()) { try @@ -57,25 +58,35 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I catch(const Ice::LocalException& ex) { exception.reset(ex.ice_clone()); + used.insert(registry); setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()))); } } if(!session) { - try + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) { - Ice::ObjectPrx obj = _query->findObjectById(registry->ice_getIdentity()); - InternalRegistryPrx newRegistry = InternalRegistryPrx::uncheckedCast(obj); - if(newRegistry && newRegistry != registry) + InternalRegistryPrx newRegistry; + try { - session = createSessionImpl(newRegistry, timeout); - setRegistry(newRegistry); + Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity()); + newRegistry = InternalRegistryPrx::uncheckedCast(obj); + if(newRegistry && used.find(newRegistry) == used.end()) + { + session = createSessionImpl(newRegistry, timeout); + setRegistry(newRegistry); + break; + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + if(newRegistry) + { + used.insert(newRegistry); + } } - } - catch(const Ice::LocalException& ex) - { - exception.reset(ex.ice_clone()); } } } @@ -197,8 +208,22 @@ NodeSessionManager::create(const NodeIPtr& node) assert(communicator->getDefaultLocator()); Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity(); + // + // Initialize the IceGrid::Query objects. The IceGrid::Query + // interface is used to lookup the registry proxy in case it + // becomes unavailable. Since replicas might not always have + // an up to date registry proxy, we need to query all the + // replicas. + // + Ice::EndpointSeq endpoints = communicator->getDefaultLocator()->ice_getEndpoints(); id.name = "Query"; - _query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + Ice::EndpointSeq singleEndpoint; + singleEndpoint.push_back(*p); + _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); + } id.name = "InternalRegistry-Master"; _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); @@ -281,7 +306,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) return p->second; } - NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _query); + NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); thread->start(); return thread; @@ -334,14 +359,26 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) } else { - thread = new NodeSessionKeepAliveThread(*p, _node, _query); + thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects); thread->start(); thread->tryCreateSession(*p); } _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); } - NodeSessionMap::const_iterator q; + NodeSessionMap::iterator q = sessions.begin(); + while(q != sessions.end()) + { + if(q->second->getSession()) // Don't destroy sessions which are still alive! + { + _sessions.insert(make_pair(q->first, q->second)); + sessions.erase(q++); + } + else + { + ++q; + } + } for(q = sessions.begin(); q != sessions.end(); ++q) { q->second->terminate(); @@ -385,10 +422,22 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) else { replicas.clear(); - Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + set<Ice::ObjectPrx> proxies; + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + try + { + Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); + proxies.insert(prxs.begin(), prxs.end()); + } + catch(const Ice::LocalException& ex) + { + // IGNORE + } + } + for(set<Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) { - replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + replicas.push_back(InternalRegistryPrx::uncheckedCast(*q)); } } } |