diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 626 |
1 files changed, 626 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp new file mode 100644 index 00000000000..792d8bda8f9 --- /dev/null +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -0,0 +1,626 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <IceGrid/NodeSessionManager.h> +#include <IceGrid/TraceLevels.h> +#include <IceGrid/NodeI.h> +#include <IceGrid/Query.h> + +using namespace std; +using namespace IceGrid; + +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, + const NodeIPtr& node, + const vector<QueryPrx>& queryObjects) : + SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), + _node(node), + _queryObjects(queryObjects) +{ + assert(registry && node && !_queryObjects.empty()); + string name = registry->ice_getIdentity().name; + const string prefix("InternalRegistry-"); + string::size_type pos = name.find(prefix); + if(pos != string::npos) + { + name = name.substr(prefix.size()); + } + const_cast<string&>(_name) = name; +} + +NodeSessionPrx +NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil::Time& timeout) +{ + NodeSessionPrx session; + auto_ptr<Ice::Exception> exception; + TraceLevelsPtr traceLevels = _node->getTraceLevels(); + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with replica `" << _name << "'"; + } + + set<InternalRegistryPrx> used; + if(!registry->ice_getEndpoints().empty()) + { + try + { + session = createSessionImpl(registry, timeout); + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + used.insert(registry); + registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())); + } + } + + if(!session) + { + vector<Ice::AsyncResultPtr> results; + for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + { + results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); + } + + for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + { + QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); + if(isDestroyed()) + { + break; + } + + InternalRegistryPrx newRegistry; + try + { + Ice::ObjectPrx obj = query->end_findObjectById(*p); + newRegistry = InternalRegistryPrx::uncheckedCast(obj); + if(newRegistry && used.find(newRegistry) == used.end()) + { + session = createSessionImpl(newRegistry, timeout); + registry = newRegistry; + break; + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + if(newRegistry) + { + used.insert(newRegistry); + } + } + } + } + } + catch(const NodeActiveException& ex) + { + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'"); + } + exception.reset(ex.ice_clone()); + } + catch(const Ice::Exception& ex) + { + exception.reset(ex.ice_clone()); + } + + if(session) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with replica `" << _name << "'"; + } + } + else + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with replica `" << _name << "':\n"; + if(exception.get()) + { + out << *exception.get(); + } + else + { + out << "failed to get replica proxy"; + } + } + } + + return session; +} + +NodeSessionPrx +NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +{ + NodeSessionPrx session = _node->registerWithRegistry(registry); + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t / 2); + } + _node->addObserver(session, session->getObserver()); + return session; +} + +void +NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) +{ + _node->removeObserver(session); + + try + { + session->destroy(); + + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "destroyed replica `" << _name << "' session"; + } + } + catch(const Ice::LocalException& ex) + { + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "couldn't destroy replica `" << _name << "' session:\n" << ex; + } + } +} + +bool +NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) +{ + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "sending keep alive message to replica `" << _name << "'"; + } + + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + return true; + } + catch(const Ice::LocalException& ex) + { + _node->removeObserver(session); + if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); + out << "lost session with replica `" << _name << "':\n" << ex; + } + return false; + } +} + +NodeSessionManager::NodeSessionManager() : + _serial(1), + _destroyed(false), + _activated(false) +{ +} + +void +NodeSessionManager::create(const NodeIPtr& node) +{ + { + Lock sync(*this); + assert(!_node); + + const_cast<NodeIPtr&>(_node) = node; + + Ice::CommunicatorPtr communicator = _node->getCommunicator(); + assert(communicator->getDefaultLocator()); + Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity(); + + // + // Initialize the IceGrid::Query objects. The IceGrid::Query + // interface is used to lookup the registry proxy in case it + // becomes unavailable. Since replicas might not always have + // an up to date registry proxy, we need to query all the + // replicas. + // + Ice::EndpointSeq endpoints = communicator->getDefaultLocator()->ice_getEndpoints(); + id.name = "Query"; + QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + Ice::EndpointSeq singleEndpoint; + singleEndpoint.push_back(*p); + _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); + } + + id.name = "InternalRegistry-Master"; + _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + + _thread = new Thread(*this); + _thread->start(); + } + + // + // Try to create the session. It's important that we wait for the + // creation of the session as this will also try to create sessions + // with replicas (see createdSession below) and this must be done + // before the node is activated. + // + _thread->tryCreateSession(true, IceUtil::Time::seconds(3)); +} + +void +NodeSessionManager::create(const InternalRegistryPrx& replica) +{ + assert(_thread); + NodeSessionKeepAliveThreadPtr thread; + if(replica->ice_getIdentity() == _master->ice_getIdentity()) + { + thread = _thread; + thread->setRegistry(replica); + } + else + { + Lock sync(*this); + thread = addReplicaSession(replica); + } + + if(thread) + { + thread->tryCreateSession(); + } +} + +void +NodeSessionManager::activate() +{ + { + Lock sync(*this); + _activated = true; + } + + // + // Get the master session, if it's not created, try to create it + // again and make sure that the servers are synchronized and the + // replica observer is set on the session. + // + NodeSessionPrx session = _thread->getSession(); + if(session) + { + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } + } +} + +bool +NodeSessionManager::waitForCreate() +{ + assert(_thread); + return _thread->waitForCreate(); +} + +void +NodeSessionManager::terminate() +{ + assert(_thread); + _thread->terminate(); +} + +void +NodeSessionManager::destroy() +{ + NodeSessionMap sessions; + { + Lock sync(*this); + if(_destroyed) + { + return; + } + _destroyed = true; + _sessions.swap(sessions); + notifyAll(); + } + + if(_thread) + { + _thread->terminate(); + } + NodeSessionMap::const_iterator p; + for(p = sessions.begin(); p != sessions.end(); ++p) + { + p->second->terminate(); + } + + if(_thread) + { + _thread->getThreadControl().join(); + } + for(p = sessions.begin(); p != sessions.end(); ++p) + { + p->second->getThreadControl().join(); + } +} + +void +NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + + // + // Initialize the set of replicas known by the master. + // + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + _replicas.insert((*p)->ice_getIdentity()); + addReplicaSession(*p)->tryCreateSession(false); + } +} + +void +NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + _replicas.insert(replica->ice_getIdentity()); + addReplicaSession(replica)->tryCreateSession(false); +} + +void +NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +{ + { + Lock sync(*this); + if(_destroyed) + { + return; + } + _replicas.erase(replica->ice_getIdentity()); + } + + // + // We don't remove the session here. It will eventually be reaped + // by reapReplicas() if the session is dead. + // +} + +NodeSessionKeepAliveThreadPtr +NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) +{ + assert(!_destroyed); + + NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); + NodeSessionKeepAliveThreadPtr thread; + if(p != _sessions.end()) + { + thread = p->second; + thread->setRegistry(replica); + } + else + { + thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); + _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); + thread->start(); + } + return thread; +} + +void +NodeSessionManager::reapReplicas() +{ + vector<NodeSessionKeepAliveThreadPtr> reap; + { + Lock sync(*this); + if(_destroyed) + { + return; + } + + NodeSessionMap::iterator q = _sessions.begin(); + while(q != _sessions.end()) + { + if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) + { + _node->removeObserver(q->second->getSession()); + reap.push_back(q->second); + _sessions.erase(q++); + } + else + { + ++q; + } + } + } + + for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) + { + (*p)->getThreadControl().join(); + } +} + +void +NodeSessionManager::syncServers(const NodeSessionPrx& session) +{ + // + // Ask the session to load the servers on the node. Once this is + // done we check the consistency of the node to make sure old + // servers are removed. + // + // NOTE: it's important for this to be done after trying to + // register with the replicas. When the master loads the server + // some server might get activated and it's better if at that time + // the registry replicas (at least the ones which are up) have all + // established their session with the node. + // + assert(session); + _node->checkConsistency(session); + session->loadServers(); +} + +void +NodeSessionManager::createdSession(const NodeSessionPrx& session) +{ + bool activated; + { + Lock sync(*this); + activated = _activated; + } + + // + // Synchronize the servers if the session is active and if the + // node adapter has been activated (otherwise, the servers will be + // synced after the node adapter activation, see activate()). + // + // We also set the replica observer to receive notifications of + // replica addition/removal. + // + if(session && activated) + { + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } + return; + } + + // + // If there's no master session or if the node adapter isn't + // activated yet, we retrieve a list of the replicas either from + // the master or from the known replicas (the ones configured with + // Ice.Default.Locator) and we try to establish connections to + // each of the replicas. + // + + InternalRegistryPrxSeq replicas; + if(session) + { + assert(!activated); // The node adapter isn't activated yet so + // we're not subscribed yet to the replica + // observer topic. + try + { + replicas = _thread->getRegistry()->getReplicas(); + } + catch(const Ice::LocalException&) + { + } + } + else + { + vector<Ice::AsyncResultPtr> results; + for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + { + results.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId())); + } + + map<Ice::Identity, Ice::ObjectPrx> proxies; + for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + { + QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); + if(isDestroyed()) + { + return; + } + + try + { + Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p); + for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) + { + // + // NOTE: We might override a good proxy here! We could improve this to make + // sure that we don't override the proxy for replica N if that proxy was + // obtained from replica N. + // + proxies[(*q)->ice_getIdentity()] = *q; + } + } + catch(const Ice::LocalException&) + { + // IGNORE + } + } + + for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + } + } + + vector<NodeSessionKeepAliveThreadPtr> sessions; + { + Lock sync(*this); + if(_destroyed) + { + return; + } + + // + // If the node adapter was activated since we last check, we don't need + // to initialize the replicas here, it will be done by replicaInit(). + // + if(!session || !_activated) + { + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + if((*p)->ice_getIdentity() != _master->ice_getIdentity()) + { + _replicas.insert((*p)->ice_getIdentity()); + NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p); + session->tryCreateSession(false); + sessions.push_back(session); + } + } + } + } + + // + // Wait for the creation. It's important to wait to ensure that + // the replica sessions are created before the node adapter is + // activated. + // + IceUtil::Time before = IceUtil::Time::now(); + for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p) + { + if(isDestroyed()) + { + return; + } + IceUtil::Time timeout = IceUtil::Time::seconds(5) - (IceUtil::Time::now() - before); + if(timeout <= IceUtil::Time()) + { + break; + } + (*p)->tryCreateSession(true, timeout); + } +} + |