From 95779a66eb32286140dec13a1641f5723d322168 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Tue, 25 Jul 2006 17:25:58 +0000 Subject: More replication work. --- cpp/src/IceGrid/NodeSessionManager.cpp | 465 +++++++++++++++++++++++++++------ 1 file changed, 383 insertions(+), 82 deletions(-) (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp') diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 4e6381c38c9..6c4d4cefc90 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -16,13 +16,23 @@ using namespace std; using namespace IceGrid; -NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, - const NodeIPtr& node) : - _registry(InternalRegistryPrx::uncheckedCast(registry->ice_adapterId(""))), +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) : + _registry(registry), _node(node), - _timeout(IceUtil::Time::seconds(5)), _shutdown(false) { + string name = _registry->ice_getIdentity().name; + const string prefix("InternalRegistry-"); + string::size_type pos = name.find(prefix); + if(pos != string::npos) + { + name = name.substr(prefix.size()); + } + const_cast(_name) = name; + const_cast(_registry) = + InternalRegistryPrx::uncheckedCast( + _registry->ice_getCommunicator()->stringToProxy( + _registry->ice_getCommunicator()->identityToString(_registry->ice_getIdentity()))); } void @@ -32,27 +42,102 @@ NodeSessionKeepAliveThread::run() // Keep alive the session. // NodeSessionPrx session; + IceUtil::Time timeout = IceUtil::Time::seconds(5); + TraceLevelsPtr traceLevels = _node->getTraceLevels(); while(true) { - keepAlive(session); + // + // Send a keep alive message to the session. + // + if(session) + { + if(traceLevels && traceLevels->replica > 2) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "sending keep alive message to replica `" << _name << "'"; + } + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "lost session with replica `" << _name << "':\n" << ex; + } + session = 0; + } + } + + // + // If the session isn't established yet, try to create a new + // session. + // + if(!session) { - Lock sync(*this); + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with replica `" << _name << "'"; + } - session = _session; + session = _node->registerWithRegistry(_registry); + { + Lock sync(*this); + _session = session; + notifyAll(); + } - if(!_shutdown) + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with replica `" << _name << "'"; + } + } + catch(const NodeActiveException&) + { + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already registered and active"); + } + } + catch(const Ice::LocalException& ex) { - timedWait(_timeout); + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with replica `" << _name << "':\n" << ex; + } } + } + // + // Wait for the configured timeout duration. + // + { + Lock sync(*this); + if(!_shutdown) + { + timedWait(timeout); + } if(_shutdown) { break; } - } + } } - + // // Destroy the session. // @@ -61,27 +146,33 @@ NodeSessionKeepAliveThread::run() try { session->destroy(); + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "destroyed replica `" << _name << "' session"; + } } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { - // - // TODO: XXX: TRACE? - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex; -// _node->getTraceLevels()->logger->warning(os.str()); + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "couldn't destroy replica `" << _name << "' session:\n" << ex; + } } } } -void +bool NodeSessionKeepAliveThread::waitForCreate() { Lock sync(*this); - while(!_session) + while(!_session && !_shutdown) { wait(); } + return !_shutdown; } void @@ -92,100 +183,260 @@ NodeSessionKeepAliveThread::terminate() notifyAll(); } +NodeSessionManager::NodeSessionManager() : + _serial(1), + _destroyed(false) +{ +} + void -NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) +NodeSessionManager::create(const NodeIPtr& node) { - if(session) - { - try - { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - return; // We're done! - } - catch(const Ice::LocalException&) - { - } - } + assert(!_node); + + const_cast(_node) = node; + + Ice::CommunicatorPtr communicator = _node->getCommunicator(); + assert(communicator->getDefaultLocator()); + Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity(); - try + id.name = "Query"; + _query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + + id.name = "InternalRegistry"; + _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + + _thread = new Thread(*this); + _thread->start(); +} + +void +NodeSessionManager::run() +{ + NodeSessionPrx session; + TraceLevelsPtr traceLevels = _node->getTraceLevels(); + IceUtil::Time timeout = IceUtil::Time::seconds(5); + while(true) { - NodeSessionPrx newSession = _node->registerWithRegistry(_registry); - int timeout = newSession->getTimeout(); + if(session) { - Lock sync(*this); - if(timeout > 0) + if(traceLevels && traceLevels->replica > 2) { - _timeout = IceUtil::Time::seconds(timeout); + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "sending keep alive message to master replica"; + } + + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "lost session with master replica:\n" << ex; + } + session = 0; + _node->setObserver(0); + { + Lock sync(*this); + _masterSession = session; + notifyAll(); + } } - _session = newSession; - notifyAll(); } - } - catch(const NodeActiveException&) - { - _node->getTraceLevels()->logger->error("a node with the same name is already registered and active"); - } - catch(const Ice::LocalException&) - { - // - // TODO: FIX THIS SHOULD BE A TRACE - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry:\n" << ex; -// _node->getTraceLevels()->logger->warning(os.str()); - } -} -NodeSessionManager::NodeSessionManager() -{ -} + if(!session) + { + // + // Establish a session with the master IceGrid registry. + // + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with master replica"; + } -void -NodeSessionManager::create(const NodeIPtr& node) -{ - assert(!_node); - const_cast(_node) = node; + session = _node->registerWithRegistry(_master); - Ice::CommunicatorPtr communicator = _node->getCommunicator(); - Ice::PropertiesPtr properties = communicator->getProperties(); + _node->setObserver(session->getObserver()); - Ice::LocatorPrx locator = communicator->getDefaultLocator(); - assert(locator); - string instanceName = locator->ice_getIdentity().category; + // + // We only check consistency with the master registry. + // + _node->checkConsistency(session); + + { + Lock sync(*this); + _masterSession = session; + notifyAll(); + } - QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/Query")); - Ice::ObjectProxySeq proxies = query->findAllObjectsByType(InternalRegistry::ice_staticId()); - NodeSessionKeepAliveThreadPtr thread; + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } - Lock sync(*this); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with master replica"; + } + } + catch(const NodeActiveException&) + { + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already registered and active"); + } + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with master replica:\n" << ex; + } + } + + // + // Get the list of replicas (either with the master + // session or the IceGrid::Query interface) and make sure + // we have sessions opened to these replicas. + // + try + { + unsigned long serial = 0; + InternalRegistryPrxSeq replicas; + while(true) + { + { + Lock sync(*this); + if(serial == _serial) + { + _serial = 1; + syncReplicas(replicas); + break; + } + serial = _serial; + } + + if(session) + { + replicas = session->getReplicas(); + } + else + { + replicas.clear(); + Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + } + } + } + } + catch(const Ice::LocalException& ex) + { + // IGNORE + } + } + + { + Lock sync(*this); + if(!_destroyed) + { + timedWait(timeout); + } + if(_destroyed) + { + break; + } + } + } + + // + // Destroy the session. + // + if(session) { - thread = new NodeSessionKeepAliveThread(InternalRegistryPrx::uncheckedCast(*p), _node); - thread->start(); - _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + try + { + session->destroy(); + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "destroyed master replica session"; + } + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "couldn't destroy master replica session:\n" << ex; + } + } } } void NodeSessionManager::waitForCreate() { - Lock sync(*this); - for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + // + // Wait for the node to establish the session with the master or + // at least one replica registry. + // + while(true) { - p->second->waitForCreate(); + NodeSessionKeepAliveThreadPtr thread; + { + Lock sync(*this); + while(!_masterSession || _sessions.empty() || !_destroyed) + { + wait(); + } + + if(_masterSession || _destroyed) + { + return; + } + else + { + thread = _sessions.begin()->second; + } + } + if(thread->waitForCreate()) + { + break; + } } } void NodeSessionManager::destroy() { - Lock sync(*this); + NodeSessionMap sessions; + { + Lock sync(*this); + _destroyed = true; + _sessions.swap(sessions); + notifyAll(); + } + NodeSessionMap::const_iterator p; - for(p = _sessions.begin(); p != _sessions.end(); ++p) + for(p = sessions.begin(); p != sessions.end(); ++p) { p->second->terminate(); } - for(p = _sessions.begin(); p != _sessions.end(); ++p) + + _thread->getThreadControl().join(); + for(p = sessions.begin(); p != sessions.end(); ++p) { p->second->getThreadControl().join(); } @@ -195,6 +446,12 @@ void NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) { Lock sync(*this); + if(_destroyed) + { + return; + } + + ++_serial; if(_sessions.find(replica->ice_getIdentity()) != _sessions.end()) { return; @@ -211,6 +468,12 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) NodeSessionKeepAliveThreadPtr thread; { Lock sync(*this); + if(_destroyed) + { + return; + } + + --_serial; NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity()); if(p != _sessions.end()) { @@ -225,3 +488,41 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) } } +void +NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) +{ + NodeSessionMap sessions; + _sessions.swap(sessions); + + NodeSessionKeepAliveThreadPtr thread; + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + if((*p)->ice_getIdentity() == _master->ice_getIdentity()) + { + continue; + } + NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity()); + if(q != sessions.end()) + { + thread = q->second; + sessions.erase((*p)->ice_getIdentity()); + } + else + { + thread = new NodeSessionKeepAliveThread(*p, _node); + thread->start(); + } + _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + } + + NodeSessionMap::const_iterator q; + for(q = sessions.begin(); q != sessions.end(); ++q) + { + q->second->terminate(); + } + for(q = sessions.begin(); q != sessions.end(); ++q) + { + q->second->getThreadControl().join(); + } +} + -- cgit v1.2.3