diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp new file mode 100644 index 00000000000..5def24820d6 --- /dev/null +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -0,0 +1,226 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <IceGrid/NodeSessionManager.h> +#include <IceGrid/TraceLevels.h> +#include <IceGrid/NodeI.h> +#include <IceGrid/Query.h> + +using namespace std; +using namespace IceGrid; + +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, + const NodeIPtr& node) : + _registry(InternalRegistryPrx::uncheckedCast(registry->ice_adapterId(""))), + _node(node), + _timeout(IceUtil::Time::seconds(5)), + _shutdown(false) +{ +} + +void +NodeSessionKeepAliveThread::run() +{ + // + // Keep alive the session. + // + NodeSessionPrx session; + while(true) + { + keepAlive(session); + + { + Lock sync(*this); + + session = _session; + + if(!_shutdown) + { + timedWait(_timeout); + } + + if(_shutdown) + { + break; + } + } + } + + // + // Destroy the session. + // + if(session) + { + try + { + session->destroy(); + } + catch(const Ice::LocalException& ex) + { + // + // TODO: XXX: TRACE? + // +// ostringstream os; +// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex; +// _node->getTraceLevels()->logger->warning(os.str()); + } + } +} + +void +NodeSessionKeepAliveThread::waitForCreate() +{ + Lock sync(*this); + while(!_session) + { + wait(); + } +} + +void +NodeSessionKeepAliveThread::terminate() +{ + Lock sync(*this); + _shutdown = true; + notifyAll(); +} + +void +NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) +{ + if(session) + { + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + return; // We're done! + } + catch(const Ice::LocalException&) + { + } + } + + try + { + NodeSessionPrx newSession = _node->registerWithRegistry(_registry); + int timeout = newSession->getTimeout(); + { + Lock sync(*this); + if(timeout > 0) + { + _timeout = IceUtil::Time::seconds(timeout); + } + _session = newSession; + notifyAll(); + } + } + catch(const NodeActiveException&) + { + _node->getTraceLevels()->logger->error("a node with the same name is already registered and active"); + } + catch(const Ice::LocalException& ex) + { + // + // TODO: FIX THIS SHOULD BE A TRACE + // + ostringstream os; + os << "couldn't contact the IceGrid registry:\n" << ex; + _node->getTraceLevels()->logger->warning(os.str()); + } +} + +NodeSessionManager::NodeSessionManager() +{ +} + +void +NodeSessionManager::create(const NodeIPtr& node) +{ + assert(!_node); + const_cast<NodeIPtr&>(_node) = node; + + Ice::CommunicatorPtr communicator = _node->getCommunicator(); + Ice::PropertiesPtr properties = communicator->getProperties(); + + Ice::LocatorPrx locator = communicator->getDefaultLocator(); + assert(locator); + string instanceName = locator->ice_getIdentity().category; + + QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/Query")); + Ice::ObjectProxySeq proxies = query->findAllObjectsByType(InternalRegistry::ice_staticId()); + NodeSessionKeepAliveThreadPtr thread; + + Lock sync(*this); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + thread = new NodeSessionKeepAliveThread(InternalRegistryPrx::uncheckedCast(*p), _node); + thread->start(); + _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + } +} + +void +NodeSessionManager::waitForCreate() +{ + Lock sync(*this); + for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + { + p->second->waitForCreate(); + } +} + +void +NodeSessionManager::destroy() +{ + Lock sync(*this); + for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + { + p->second->terminate(); + } + for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + { + p->second->getThreadControl().join(); + } +} + +void +NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) +{ + Lock sync(*this); + if(_sessions.find(replica->ice_getIdentity()) != _sessions.end()) + { + return; + } + + NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node); + thread->start(); + _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); +} + +void +NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +{ + NodeSessionKeepAliveThreadPtr thread; + { + Lock sync(*this); + NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity()); + if(p != _sessions.end()) + { + thread = p->second; + _sessions.erase(p); + } + } + if(thread) + { + thread->terminate(); + thread->getThreadControl().join(); + } +} + |