diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-12-04 19:43:10 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-12-04 19:43:10 +0000 |
commit | 996be326b98f2c5bb776ba68dd2b3d8b2861a12a (patch) | |
tree | 446bb318a97f7e64b35f299407f03868942c7c0f /cpp/src/IceGrid/NodeSessionManager.cpp | |
parent | Removed -single_module, it doesn't seem to be neccessary anymore (diff) | |
download | ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.bz2 ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.xz ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.zip |
More cleanup
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 346 |
1 files changed, 194 insertions, 152 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 6ee09c7fa3c..967a15dae5c 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -237,10 +237,10 @@ NodeSessionManager::create(const NodeIPtr& node) _thread->start(); // - // We can't wait for the session to be created here as the node - // adapter isn't activated yet and the registry would hang trying - // to load the servers on the node (when createSession invokes - // loadServers() on the session). + // Try to create the session. It's important that we wait for the + // creation of the session as this will also try to create sessions + // with replicas (see createdSession below) and this must be done + // before the node is activated. // _thread->tryCreateSession(true); } @@ -258,17 +258,23 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) } else { - replicaAdded(replica, true); + createReplicaSession(replica, true); } } void -NodeSessionManager::activated() +NodeSessionManager::activate() { { Lock sync(*this); _activated = true; } + + // + // Get the master session, if it's not created, try to create it + // again and make sure that the servers are synchronized and the + // replica observer is set on the session. + // NodeSessionPrx session = _thread->getSession(); if(!session) { @@ -277,7 +283,14 @@ NodeSessionManager::activated() } if(session) { - syncServers(session); + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } } } @@ -321,121 +334,119 @@ NodeSessionManager::destroy() } void -NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica, bool waitTryCreateSession) +NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) { - Lock sync(*this); - if(_destroyed) { - return; - } + Lock sync(*this); + if(_destroyed) + { + return; + } - ++_serial; - NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); - NodeSessionKeepAliveThreadPtr thread; - if(p != _sessions.end()) - { - thread = p->second; - thread->setRegistry(replica); + // + // Initialize the set of replicas known by the master. + // + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + _replicas.insert((*p)->ice_getIdentity()); + } } - else + + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); - _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); - thread->start(); + createReplicaSession(*p, false); } - thread->tryCreateSession(waitTryCreateSession); } void -NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) { - NodeSessionKeepAliveThreadPtr thread; { Lock sync(*this); if(_destroyed) { return; } + _replicas.insert(replica->ice_getIdentity()); + } + + createReplicaSession(replica, false); +} - ++_serial; - NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity()); - if(p != _sessions.end()) +void +NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +{ + { + Lock sync(*this); + if(_destroyed) { - thread = p->second; - _sessions.erase(p); + return; } + _replicas.erase(replica->ice_getIdentity()); } - if(thread) - { - _node->removeObserver(thread->getSession()); // Needs to be done here because we don't destroy the session. - thread->terminate(false); // Don't destroy the session, the replica is being shutdown! - thread->getThreadControl().join(); - } + + // + // We don't remove the session here. It will eventually be reaped + // by reapReplicas() if the session is dead. + // } void -NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) -{ - NodeSessionMap sessions; - _sessions.swap(sessions); +NodeSessionManager::createReplicaSession(const InternalRegistryPrx& replica, bool waitTryCreateSession) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); NodeSessionKeepAliveThreadPtr thread; - vector<NodeSessionKeepAliveThreadPtr> newSessions; - for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + if(p != _sessions.end()) { - if((*p)->ice_getIdentity() == _master->ice_getIdentity()) - { - continue; - } - NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity()); - if(q != sessions.end()) - { - thread = q->second; - sessions.erase((*p)->ice_getIdentity()); - } - else - { - thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects); - thread->start(); - thread->tryCreateSession(false); - newSessions.push_back(thread); - } - _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + thread = p->second; + thread->setRegistry(replica); } + else + { + thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); + _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); + thread->start(); + } + thread->tryCreateSession(waitTryCreateSession); +} - NodeSessionMap::iterator q = sessions.begin(); - while(q != sessions.end()) +void +NodeSessionManager::reapReplicas() +{ + vector<NodeSessionKeepAliveThreadPtr> reap; { - if(q->second->getSession()) // Don't destroy sessions which are still alive! + Lock sync(*this); + if(_destroyed) { - _sessions.insert(make_pair(q->first, q->second)); - sessions.erase(q++); + return; } - else + + NodeSessionMap::iterator q = _sessions.begin(); + while(q != _sessions.end()) { - ++q; + if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) + { + _node->removeObserver(q->second->getSession()); + reap.push_back(q->second); + _sessions.erase(q++); + } + else + { + ++q; + } } } - for(q = sessions.begin(); q != sessions.end(); ++q) - { - q->second->terminate(); - } - for(q = sessions.begin(); q != sessions.end(); ++q) - { - q->second->getThreadControl().join(); - } - // - // If the node is being started, we wait for the new sessions to - // be created. This ensures that once the node is activated all - // the known replicas are connected. - // - if(!_activated) + for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) { - for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t) - { - (*t)->tryCreateSession(true); - } + (*p)->getThreadControl().join(); } } @@ -454,99 +465,130 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session) // established their session with the node. // assert(session); - try - { - session->loadServers(); - _node->checkConsistency(session); - } - catch(const Ice::LocalException&) - { - } + session->loadServers(); + _node->checkConsistency(session); } void NodeSessionManager::createdSession(const NodeSessionPrx& session) { + bool activated; + { + Lock sync(*this); + activated = _activated; + } + // - // Get the list of replicas (either with the master session or the - // IceGrid::Query interface) and make sure we have sessions opened - // to these replicas. + // Synchronize the servers if the session is active and if the + // node adapter has been activated (otherwise, the servers will be + // synced after the node adapter activation, see activate()). // - try + // We also set the replica observer to receive notifications of + // replica addition/removal. + // + if(session && activated) + { + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } + return; + } + + // + // If there's no master session or if the node adapter isn't + // activated yet, we retrieve a list of the replicas either from + // the master or from the known replicas (the ones configured with + // Ice.Default.Locator) and we try to establish connections to + // each of the replicas. + // + + InternalRegistryPrxSeq replicas; + if(session) { - unsigned long serial = 0; - InternalRegistryPrxSeq replicas; - while(true) + assert(!activated); // The node adapter isn't activated yet so + // we're not subscribed yet to the replica + // observer topic. + try { + replicas = _thread->getRegistry()->getReplicas(); + } + catch(const Ice::LocalException&) + { + } + } + else + { + map<Ice::Identity, Ice::ObjectPrx> proxies; + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + try { - Lock sync(*this); - if(serial == _serial) + Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) { - NodeSessionManager& self = const_cast<NodeSessionManager&>(*this); - self._serial = 1; - self.syncReplicas(replicas); - break; + // + // NOTE: We might override a good proxy here! We could improve this to make + // sure that we don't override the proxy for replica N if that proxy was + // obtained from replica N. + // + proxies[(*q)->ice_getIdentity()] = *q; } - serial = _serial; } - - if(session) + catch(const Ice::LocalException&) { - replicas = _thread->getRegistry()->getReplicas(); + // IGNORE } - else + } + + for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + } + } + + { + Lock sync(*this); + + // + // If the node adapter was activated since we last check, we don't need + // to initialize the replicas here, it will be done by replicaInit(). + // + if(!session || !_activated) + { + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - replicas.clear(); - map<Ice::Identity, Ice::ObjectPrx> proxies; - for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) - { - try - { - Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) - { - // - // NOTE: We might override a good proxy - // here! We could improve this to make - // sure that we don't override the proxy - // for replica N if that proxy was - // obtained from replica N. - // - proxies[(*q)->ice_getIdentity()] = *q; - } - } - catch(const Ice::LocalException&) - { - // IGNORE - } - } - for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + if((*p)->ice_getIdentity() != _master->ice_getIdentity()) { - replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + _replicas.insert((*p)->ice_getIdentity()); } } } } - catch(const Ice::LocalException&) - { - // IGNORE - } - + // - // Synchronize the servers if the session is active and if the - // node adapter has been activated (otherwise, the servers will be - // synced after the node adapter activation, see activated()). + // Create the replica sessions and wait for the creation. It's + // important to wait to ensure that the replica sessions are + // created before the node adapter is activated. // - if(session) + InternalRegistryPrxSeq::const_iterator t; + for(t = replicas.begin(); t != replicas.end(); ++t) { - bool activated; + if((*t)->ice_getIdentity() != _master->ice_getIdentity()) { - Lock sync(*this); - activated = _activated; + createReplicaSession(*t, false); } - if(activated) + } + for(t = replicas.begin(); t != replicas.end(); ++t) + { + if((*t)->ice_getIdentity() != _master->ice_getIdentity()) { - syncServers(session); + createReplicaSession(*t, true); } } } |