diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 458 |
1 files changed, 229 insertions, 229 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index ff522a1a58e..f723134a0a5 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -17,8 +17,8 @@ using namespace std; using namespace IceGrid; NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, - const NodeIPtr& node, - const vector<QueryPrx>& queryObjects) : + const NodeIPtr& node, + const vector<QueryPrx>& queryObjects) : SessionKeepAliveThread<NodeSessionPrx>(registry), _node(node), _queryObjects(queryObjects) @@ -29,7 +29,7 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx string::size_type pos = name.find(prefix); if(pos != string::npos) { - name = name.substr(prefix.size()); + name = name.substr(prefix.size()); } const_cast<string&>(_name) = name; } @@ -42,90 +42,90 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil TraceLevelsPtr traceLevels = _node->getTraceLevels(); try { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "trying to establish session with replica `" << _name << "'"; - } - - set<InternalRegistryPrx> used; - if(!registry->ice_getEndpoints().empty()) - { - try - { - session = createSessionImpl(registry, timeout); - } - catch(const Ice::LocalException& ex) - { - exception.reset(ex.ice_clone()); - used.insert(registry); - registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())); - } - } - - if(!session) - { - for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) - { - InternalRegistryPrx newRegistry; - try - { - Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity()); - newRegistry = InternalRegistryPrx::uncheckedCast(obj); - if(newRegistry && used.find(newRegistry) == used.end()) - { - session = createSessionImpl(newRegistry, timeout); - registry = newRegistry; - break; - } - } - catch(const Ice::LocalException& ex) - { - exception.reset(ex.ice_clone()); - if(newRegistry) - { - used.insert(newRegistry); - } - } - } - } + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with replica `" << _name << "'"; + } + + set<InternalRegistryPrx> used; + if(!registry->ice_getEndpoints().empty()) + { + try + { + session = createSessionImpl(registry, timeout); + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + used.insert(registry); + registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())); + } + } + + if(!session) + { + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + InternalRegistryPrx newRegistry; + try + { + Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity()); + newRegistry = InternalRegistryPrx::uncheckedCast(obj); + if(newRegistry && used.find(newRegistry) == used.end()) + { + session = createSessionImpl(newRegistry, timeout); + registry = newRegistry; + break; + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + if(newRegistry) + { + used.insert(newRegistry); + } + } + } + } } catch(const NodeActiveException& ex) { - if(traceLevels) - { - traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'"); - } - exception.reset(ex.ice_clone()); + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'"); + } + exception.reset(ex.ice_clone()); } catch(const Ice::Exception& ex) { - exception.reset(ex.ice_clone()); + exception.reset(ex.ice_clone()); } if(session) { - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "established session with replica `" << _name << "'"; - } + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with replica `" << _name << "'"; + } } else { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "failed to establish session with replica `" << _name << "':\n"; - if(exception.get()) - { - out << *exception.get(); - } - else - { - out << "failed to get replica proxy"; - } - } + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with replica `" << _name << "':\n"; + if(exception.get()) + { + out << *exception.get(); + } + else + { + out << "failed to get replica proxy"; + } + } } return session; @@ -138,7 +138,7 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr int t = session->getTimeout(); if(t > 0) { - timeout = IceUtil::Time::seconds(t / 2); + timeout = IceUtil::Time::seconds(t / 2); } _node->addObserver(session, session->getObserver()); return session; @@ -151,21 +151,21 @@ NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) try { - session->destroy(); + session->destroy(); - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "destroyed replica `" << _name << "' session"; - } + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "destroyed replica `" << _name << "' session"; + } } catch(const Ice::LocalException& ex) { - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "couldn't destroy replica `" << _name << "' session:\n" << ex; - } + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "couldn't destroy replica `" << _name << "' session:\n" << ex; + } } } @@ -174,24 +174,24 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) { if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "sending keep alive message to replica `" << _name << "'"; + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "sending keep alive message to replica `" << _name << "'"; } try { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - return true; + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + return true; } catch(const Ice::LocalException& ex) { - _node->removeObserver(session); - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "lost session with replica `" << _name << "':\n" << ex; - } - return false; + _node->removeObserver(session); + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "lost session with replica `" << _name << "':\n" << ex; + } + return false; } } @@ -225,9 +225,9 @@ NodeSessionManager::create(const NodeIPtr& node) QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) { - Ice::EndpointSeq singleEndpoint; - singleEndpoint.push_back(*p); - _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); + Ice::EndpointSeq singleEndpoint; + singleEndpoint.push_back(*p); + _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); } id.name = "InternalRegistry-Master"; @@ -252,17 +252,17 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) NodeSessionKeepAliveThreadPtr thread; if(replica->ice_getIdentity() == _master->ice_getIdentity()) { - thread = _thread; - thread->setRegistry(replica); + thread = _thread; + thread->setRegistry(replica); } else { - thread = addReplicaSession(replica); + thread = addReplicaSession(replica); } if(thread) { - thread->tryCreateSession(); + thread->tryCreateSession(); } } @@ -270,8 +270,8 @@ void NodeSessionManager::activate() { { - Lock sync(*this); - _activated = true; + Lock sync(*this); + _activated = true; } // @@ -282,19 +282,19 @@ NodeSessionManager::activate() NodeSessionPrx session = _thread->getSession(); if(!session) { - _thread->tryCreateSession(true); - session = _thread->getSession(); + _thread->tryCreateSession(true); + session = _thread->getSession(); } if(session) { - try - { - session->setReplicaObserver(_node->getProxy()); - syncServers(session); - } - catch(const Ice::LocalException&) - { - } + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } } } @@ -317,23 +317,23 @@ NodeSessionManager::destroy() { NodeSessionMap sessions; { - Lock sync(*this); - _destroyed = true; - _sessions.swap(sessions); - notifyAll(); + Lock sync(*this); + _destroyed = true; + _sessions.swap(sessions); + notifyAll(); } _thread->terminate(); NodeSessionMap::const_iterator p; for(p = sessions.begin(); p != sessions.end(); ++p) { - p->second->terminate(); + p->second->terminate(); } _thread->getThreadControl().join(); for(p = sessions.begin(); p != sessions.end(); ++p) { - p->second->getThreadControl().join(); + p->second->getThreadControl().join(); } } @@ -343,7 +343,7 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) Lock sync(*this); if(_destroyed) { - return; + return; } // @@ -352,8 +352,8 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) _replicas.clear(); for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - _replicas.insert((*p)->ice_getIdentity()); - addReplicaSession(*p)->tryCreateSession(false); + _replicas.insert((*p)->ice_getIdentity()); + addReplicaSession(*p)->tryCreateSession(false); } } @@ -363,7 +363,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) Lock sync(*this); if(_destroyed) { - return; + return; } _replicas.insert(replica->ice_getIdentity()); addReplicaSession(replica)->tryCreateSession(false); @@ -373,12 +373,12 @@ void NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) { { - Lock sync(*this); - if(_destroyed) - { - return; - } - _replicas.erase(replica->ice_getIdentity()); + Lock sync(*this); + if(_destroyed) + { + return; + } + _replicas.erase(replica->ice_getIdentity()); } // @@ -396,14 +396,14 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) NodeSessionKeepAliveThreadPtr thread; if(p != _sessions.end()) { - thread = p->second; - thread->setRegistry(replica); + thread = p->second; + thread->setRegistry(replica); } else { - thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); - _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); - thread->start(); + thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); + _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); + thread->start(); } return thread; } @@ -413,31 +413,31 @@ NodeSessionManager::reapReplicas() { vector<NodeSessionKeepAliveThreadPtr> reap; { - Lock sync(*this); - if(_destroyed) - { - return; - } - - NodeSessionMap::iterator q = _sessions.begin(); - while(q != _sessions.end()) - { - if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) - { - _node->removeObserver(q->second->getSession()); - reap.push_back(q->second); - _sessions.erase(q++); - } - else - { - ++q; - } - } + Lock sync(*this); + if(_destroyed) + { + return; + } + + NodeSessionMap::iterator q = _sessions.begin(); + while(q != _sessions.end()) + { + if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) + { + _node->removeObserver(q->second->getSession()); + reap.push_back(q->second); + _sessions.erase(q++); + } + else + { + ++q; + } + } } for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) { - (*p)->getThreadControl().join(); + (*p)->getThreadControl().join(); } } @@ -465,8 +465,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) { bool activated; { - Lock sync(*this); - activated = _activated; + Lock sync(*this); + activated = _activated; } // @@ -479,15 +479,15 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // if(session && activated) { - try - { - session->setReplicaObserver(_node->getProxy()); - syncServers(session); - } - catch(const Ice::LocalException&) - { - } - return; + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } + return; } // @@ -501,73 +501,73 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) InternalRegistryPrxSeq replicas; if(session) { - assert(!activated); // The node adapter isn't activated yet so - // we're not subscribed yet to the replica + assert(!activated); // The node adapter isn't activated yet so + // we're not subscribed yet to the replica // observer topic. - try - { - replicas = _thread->getRegistry()->getReplicas(); - } - catch(const Ice::LocalException&) - { - } + try + { + replicas = _thread->getRegistry()->getReplicas(); + } + catch(const Ice::LocalException&) + { + } } else { - map<Ice::Identity, Ice::ObjectPrx> proxies; - for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) - { - try - { - Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) - { - // - // NOTE: We might override a good proxy here! We could improve this to make - // sure that we don't override the proxy for replica N if that proxy was - // obtained from replica N. - // - proxies[(*q)->ice_getIdentity()] = *q; - } - } - catch(const Ice::LocalException&) - { - // IGNORE - } - } - - for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) - { - replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); - } + map<Ice::Identity, Ice::ObjectPrx> proxies; + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + try + { + Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) + { + // + // NOTE: We might override a good proxy here! We could improve this to make + // sure that we don't override the proxy for replica N if that proxy was + // obtained from replica N. + // + proxies[(*q)->ice_getIdentity()] = *q; + } + } + catch(const Ice::LocalException&) + { + // IGNORE + } + } + + for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + } } vector<NodeSessionKeepAliveThreadPtr> sessions; { - Lock sync(*this); - if(_destroyed) - { - return; - } - - // - // If the node adapter was activated since we last check, we don't need - // to initialize the replicas here, it will be done by replicaInit(). - // - if(!session || !_activated) - { - _replicas.clear(); - for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) - { - if((*p)->ice_getIdentity() != _master->ice_getIdentity()) - { - _replicas.insert((*p)->ice_getIdentity()); - NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p); - session->tryCreateSession(false); - sessions.push_back(session); - } - } - } + Lock sync(*this); + if(_destroyed) + { + return; + } + + // + // If the node adapter was activated since we last check, we don't need + // to initialize the replicas here, it will be done by replicaInit(). + // + if(!session || !_activated) + { + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + if((*p)->ice_getIdentity() != _master->ice_getIdentity()) + { + _replicas.insert((*p)->ice_getIdentity()); + NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p); + session->tryCreateSession(false); + sessions.push_back(session); + } + } + } } // @@ -577,7 +577,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p) { - (*p)->tryCreateSession(true); + (*p)->tryCreateSession(true); } } |