summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp89
1 files changed, 69 insertions, 20 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index c6742056400..bc22006450d 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -18,12 +18,12 @@ using namespace IceGrid;
NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
const NodeIPtr& node,
- const IceGrid::QueryPrx& query) :
+ const vector<QueryPrx>& queryObjects) :
SessionKeepAliveThread<NodeSessionPrx>(registry),
_node(node),
- _query(query)
+ _queryObjects(queryObjects)
{
- assert(registry && node && query);
+ assert(registry && node && !_queryObjects.empty());
string name = registry->ice_getIdentity().name;
const string prefix("InternalRegistry-");
string::size_type pos = name.find(prefix);
@@ -48,6 +48,7 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I
out << "trying to establish session with replica `" << _name << "'";
}
+ set<InternalRegistryPrx> used;
if(!registry->ice_getEndpoints().empty())
{
try
@@ -57,25 +58,35 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I
catch(const Ice::LocalException& ex)
{
exception.reset(ex.ice_clone());
+ used.insert(registry);
setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())));
}
}
if(!session)
{
- try
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
{
- Ice::ObjectPrx obj = _query->findObjectById(registry->ice_getIdentity());
- InternalRegistryPrx newRegistry = InternalRegistryPrx::uncheckedCast(obj);
- if(newRegistry && newRegistry != registry)
+ InternalRegistryPrx newRegistry;
+ try
{
- session = createSessionImpl(newRegistry, timeout);
- setRegistry(newRegistry);
+ Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity());
+ newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ if(newRegistry && used.find(newRegistry) == used.end())
+ {
+ session = createSessionImpl(newRegistry, timeout);
+ setRegistry(newRegistry);
+ break;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ if(newRegistry)
+ {
+ used.insert(newRegistry);
+ }
}
- }
- catch(const Ice::LocalException& ex)
- {
- exception.reset(ex.ice_clone());
}
}
}
@@ -197,8 +208,22 @@ NodeSessionManager::create(const NodeIPtr& node)
assert(communicator->getDefaultLocator());
Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity();
+ //
+ // Initialize the IceGrid::Query objects. The IceGrid::Query
+ // interface is used to lookup the registry proxy in case it
+ // becomes unavailable. Since replicas might not always have
+ // an up to date registry proxy, we need to query all the
+ // replicas.
+ //
+ Ice::EndpointSeq endpoints = communicator->getDefaultLocator()->ice_getEndpoints();
id.name = "Query";
- _query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
+ QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
+ for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ Ice::EndpointSeq singleEndpoint;
+ singleEndpoint.push_back(*p);
+ _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
+ }
id.name = "InternalRegistry-Master";
_master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
@@ -281,7 +306,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
return p->second;
}
- NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _query);
+ NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
_sessions.insert(make_pair(replica->ice_getIdentity(), thread));
thread->start();
return thread;
@@ -334,14 +359,26 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
}
else
{
- thread = new NodeSessionKeepAliveThread(*p, _node, _query);
+ thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects);
thread->start();
thread->tryCreateSession(*p);
}
_sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
}
- NodeSessionMap::const_iterator q;
+ NodeSessionMap::iterator q = sessions.begin();
+ while(q != sessions.end())
+ {
+ if(q->second->getSession()) // Don't destroy sessions which are still alive!
+ {
+ _sessions.insert(make_pair(q->first, q->second));
+ sessions.erase(q++);
+ }
+ else
+ {
+ ++q;
+ }
+ }
for(q = sessions.begin(); q != sessions.end(); ++q)
{
q->second->terminate();
@@ -385,10 +422,22 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
else
{
replicas.clear();
- Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ set<Ice::ObjectPrx> proxies;
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ try
+ {
+ Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
+ proxies.insert(prxs.begin(), prxs.end());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // IGNORE
+ }
+ }
+ for(set<Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
{
- replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(*q));
}
}
}