diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-07-27 13:20:03 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-07-27 13:20:03 +0000 |
commit | dad0bda0d462b4730b168befd187ca43446f12e7 (patch) | |
tree | 1a964dc5a3bd2c945e0ee165ddda55c3e4281157 /cpp/src/IceGrid/NodeSessionManager.cpp | |
parent | Improved __checkMode (diff) | |
download | ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.bz2 ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.xz ice-dad0bda0d462b4730b168befd187ca43446f12e7.zip |
More IceGrid replication improvements.
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 576 |
1 files changed, 246 insertions, 330 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 37bc352e624..78f5f8a5427 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -17,11 +17,10 @@ using namespace std; using namespace IceGrid; NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) : - _registry(registry), - _node(node), - _shutdown(false) + SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(registry), + _node(node) { - string name = _registry->ice_getIdentity().name; + string name = registry->ice_getIdentity().name; const string prefix("InternalRegistry-"); string::size_type pos = name.find(prefix); if(pos != string::npos) @@ -29,158 +28,100 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx name = name.substr(prefix.size()); } const_cast<string&>(_name) = name; - const_cast<InternalRegistryPrx&>(_registry) = - InternalRegistryPrx::uncheckedCast( - _registry->ice_getCommunicator()->stringToProxy( - _registry->ice_getCommunicator()->identityToString(_registry->ice_getIdentity()))); } -void -NodeSessionKeepAliveThread::run() +NodeSessionPrx +NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const { - // - // Keep alive the session. - // - NodeSessionPrx session; - IceUtil::Time timeout = IceUtil::Time::seconds(5); - TraceLevelsPtr traceLevels = _node->getTraceLevels(); - while(true) - { - // - // Send a keep alive message to the session. - // - if(session) + try + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) { - if(traceLevels && traceLevels->replica > 2) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "sending keep alive message to replica `" << _name << "'"; - } - - try - { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - } - catch(const Ice::LocalException& ex) - { - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "lost session with replica `" << _name << "':\n" << ex; - } - session = 0; - } + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "trying to establish session with replica `" << _name << "'"; } - // - // If the session isn't established yet, try to create a new - // session. - // - if(!session) - { - try - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "trying to establish session with replica `" << _name << "'"; - } + NodeSessionPrx session = _node->registerWithRegistry(registry); - session = _node->registerWithRegistry(_registry); - { - Lock sync(*this); - _session = session; - notifyAll(); - } - - int t = session->getTimeout(); - if(t > 0) - { - timeout = IceUtil::Time::seconds(t); - } - - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "established session with replica `" << _name << "'"; - } - } - catch(const NodeActiveException&) - { - if(traceLevels) - { - traceLevels->logger->error("a node with the same name is already registered and active"); - } - } - catch(const Ice::LocalException& ex) - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "failed to establish session with replica `" << _name << "':\n" << ex; - } - } + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); } - - // - // Wait for the configured timeout duration. - // + + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) { - Lock sync(*this); - if(!_shutdown) - { - timedWait(timeout); - } - if(_shutdown) - { - break; - } + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "established session with replica `" << _name << "'"; } + + return session; } - - // - // Destroy the session. - // - if(session) + catch(const NodeActiveException&) { - try + if(_node->getTraceLevels()) { - session->destroy(); - - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "destroyed replica `" << _name << "' session"; - } + _node->getTraceLevels()->logger->error("a node with the same name is already registered and active"); } - catch(const Ice::LocalException& ex) + return 0; + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "couldn't destroy replica `" << _name << "' session:\n" << ex; - } + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "failed to establish session with replica `" << _name << "':\n" << ex; } + return 0; } } -bool -NodeSessionKeepAliveThread::waitForCreate() +void +NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) const { - Lock sync(*this); - while(!_session && !_shutdown) + try { - wait(); + session->destroy(); + + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "destroyed replica `" << _name << "' session"; + } + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "couldn't destroy replica `" << _name << "' session:\n" << ex; + } } - return !_shutdown; } -void -NodeSessionKeepAliveThread::terminate() +bool +NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) const { - Lock sync(*this); - _shutdown = true; - notifyAll(); + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "sending keep alive message to replica `" << _name << "'"; + } + + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + return true; + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "lost session with replica `" << _name << "':\n" << ex; + } + return false; + } } NodeSessionManager::NodeSessionManager() : @@ -206,181 +147,24 @@ NodeSessionManager::create(const NodeIPtr& node) id.name = "InternalRegistry"; _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); - _thread = new Thread(*this); + _thread = new Thread(*this, _master); _thread->start(); } void -NodeSessionManager::run() +NodeSessionManager::create(const InternalRegistryPrx& replica) { - NodeSessionPrx session; - TraceLevelsPtr traceLevels = _node->getTraceLevels(); - IceUtil::Time timeout = IceUtil::Time::seconds(5); - while(true) + assert(_thread); + if(replica->ice_getIdentity() == _master->ice_getIdentity()) { - if(session) - { - if(traceLevels && traceLevels->replica > 2) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "sending keep alive message to master replica"; - } - - try - { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - } - catch(const Ice::LocalException& ex) - { - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "lost session with master replica:\n" << ex; - } - session = 0; - _node->setObserver(0); - { - Lock sync(*this); - _masterSession = session; - notifyAll(); - } - } - } - - if(!session) - { - // - // Establish a session with the master IceGrid registry. - // - try - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "trying to establish session with master replica"; - } - - session = _node->registerWithRegistry(_master); - - _node->setObserver(session->getObserver()); - - // - // We only check consistency with the master registry. - // - _node->checkConsistency(session); - - { - Lock sync(*this); - _masterSession = session; - notifyAll(); - } - - int t = session->getTimeout(); - if(t > 0) - { - timeout = IceUtil::Time::seconds(t); - } - - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "established session with master replica"; - } - } - catch(const NodeActiveException&) - { - if(traceLevels) - { - traceLevels->logger->error("a node with the same name is already registered and active"); - } - } - catch(const Ice::LocalException& ex) - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "failed to establish session with master replica:\n" << ex; - } - } - - // - // Get the list of replicas (either with the master - // session or the IceGrid::Query interface) and make sure - // we have sessions opened to these replicas. - // - try - { - unsigned long serial = 0; - InternalRegistryPrxSeq replicas; - while(true) - { - { - Lock sync(*this); - if(serial == _serial) - { - _serial = 1; - syncReplicas(replicas); - break; - } - serial = _serial; - } - - if(session) - { - replicas = session->getReplicas(); - } - else - { - replicas.clear(); - Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) - { - replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); - } - } - } - } - catch(const Ice::LocalException&) - { - // IGNORE - } - } - - { - Lock sync(*this); - if(!_destroyed) - { - timedWait(timeout); - } - if(_destroyed) - { - break; - } - } + _thread->tryCreateSession(replica); } - - // - // Destroy the session. - // - if(session) + else { - try - { - session->destroy(); - - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "destroyed master replica session"; - } - } - catch(const Ice::LocalException& ex) + NodeSessionKeepAliveThreadPtr thread = replicaAdded(replica); + if(thread) { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "couldn't destroy master replica session:\n" << ex; - } + thread->tryCreateSession(replica); } } } @@ -388,34 +172,8 @@ NodeSessionManager::run() void NodeSessionManager::waitForCreate() { - // - // Wait for the node to establish the session with the master or - // at least one replica registry. - // - while(true) - { - NodeSessionKeepAliveThreadPtr thread; - { - Lock sync(*this); - while(!_masterSession && _sessions.empty() && !_destroyed) - { - wait(); - } - - if(_masterSession || _destroyed) - { - return; - } - else - { - thread = _sessions.begin()->second; - } - } - if(thread->waitForCreate()) - { - break; - } - } + assert(_thread); + _thread->waitForCreate(); } void @@ -429,6 +187,7 @@ NodeSessionManager::destroy() notifyAll(); } + _thread->terminate(); NodeSessionMap::const_iterator p; for(p = sessions.begin(); p != sessions.end(); ++p) { @@ -442,24 +201,31 @@ NodeSessionManager::destroy() } } -void +NodeSessionKeepAliveThreadPtr NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) { Lock sync(*this); if(_destroyed) { - return; + return 0; } ++_serial; - if(_sessions.find(replica->ice_getIdentity()) != _sessions.end()) + NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); + if(p != _sessions.end()) { - return; + return p->second; } - NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node); + InternalRegistryPrx registry = + InternalRegistryPrx::uncheckedCast( + replica->ice_getCommunicator()->stringToProxy( + replica->ice_getCommunicator()->identityToString(replica->ice_getIdentity()))); + + NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(registry, _node); thread->start(); _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); + return thread; } void @@ -526,3 +292,153 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) } } +NodeSessionPrx +NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const +{ + // + // Establish a session with the master IceGrid registry. + // + NodeSessionPrx session; + try + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "trying to establish session with master replica"; + } + + session = _node->registerWithRegistry(registry); + + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } + + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "established session with master replica"; + } + } + catch(const NodeActiveException&) + { + if(_node->getTraceLevels()) + { + _node->getTraceLevels()->logger->error("a node with the same name is already registered and active"); + } + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "failed to establish session with master replica:\n" << ex; + } + } + + if(session) + { + _node->setObserver(session->getObserver()); + _node->checkConsistency(session); + } + else + { + _node->setObserver(0); + } + + // + // Get the list of replicas (either with the master session or the + // IceGrid::Query interface) and make sure we have sessions opened + // to these replicas. + // + try + { + unsigned long serial = 0; + InternalRegistryPrxSeq replicas; + while(true) + { + { + Lock sync(*this); + if(serial == _serial) + { + NodeSessionManager& self = const_cast<NodeSessionManager&>(*this); + self._serial = 1; + self.syncReplicas(replicas); + break; + } + serial = _serial; + } + + if(session) + { + replicas = registry->getReplicas(); + } + else + { + replicas.clear(); + Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + } + } + } + } + catch(const Ice::LocalException&) + { + // IGNORE + } + + return session; +} + +bool +NodeSessionManager::keepAlive(const NodeSessionPrx& session) const +{ + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "sending keep alive message to master replica"; + } + + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + return true; + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "lost session with master replica:\n" << ex; + } + _node->setObserver(0); + return false; + } +} + +void +NodeSessionManager::destroySession(const NodeSessionPrx& session) const +{ + try + { + session->destroy(); + + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "destroyed master replica session"; + } + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "couldn't destroy master replica session:\n" << ex; + } + } +} + |