summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp465
1 files changed, 383 insertions, 82 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 4e6381c38c9..6c4d4cefc90 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -16,13 +16,23 @@
using namespace std;
using namespace IceGrid;
-NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
- const NodeIPtr& node) :
- _registry(InternalRegistryPrx::uncheckedCast(registry->ice_adapterId(""))),
+NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) :
+ _registry(registry),
_node(node),
- _timeout(IceUtil::Time::seconds(5)),
_shutdown(false)
{
+ string name = _registry->ice_getIdentity().name;
+ const string prefix("InternalRegistry-");
+ string::size_type pos = name.find(prefix);
+ if(pos != string::npos)
+ {
+ name = name.substr(prefix.size());
+ }
+ const_cast<string&>(_name) = name;
+ const_cast<InternalRegistryPrx&>(_registry) =
+ InternalRegistryPrx::uncheckedCast(
+ _registry->ice_getCommunicator()->stringToProxy(
+ _registry->ice_getCommunicator()->identityToString(_registry->ice_getIdentity())));
}
void
@@ -32,27 +42,102 @@ NodeSessionKeepAliveThread::run()
// Keep alive the session.
//
NodeSessionPrx session;
+ IceUtil::Time timeout = IceUtil::Time::seconds(5);
+ TraceLevelsPtr traceLevels = _node->getTraceLevels();
while(true)
{
- keepAlive(session);
+ //
+ // Send a keep alive message to the session.
+ //
+ if(session)
+ {
+ if(traceLevels && traceLevels->replica > 2)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "sending keep alive message to replica `" << _name << "'";
+ }
+ try
+ {
+ session->keepAlive(_node->getPlatformInfo().getLoadInfo());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "lost session with replica `" << _name << "':\n" << ex;
+ }
+ session = 0;
+ }
+ }
+
+ //
+ // If the session isn't established yet, try to create a new
+ // session.
+ //
+ if(!session)
{
- Lock sync(*this);
+ try
+ {
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "trying to establish session with replica `" << _name << "'";
+ }
- session = _session;
+ session = _node->registerWithRegistry(_registry);
+ {
+ Lock sync(*this);
+ _session = session;
+ notifyAll();
+ }
- if(!_shutdown)
+ int t = session->getTimeout();
+ if(t > 0)
+ {
+ timeout = IceUtil::Time::seconds(t);
+ }
+
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "established session with replica `" << _name << "'";
+ }
+ }
+ catch(const NodeActiveException&)
+ {
+ if(traceLevels)
+ {
+ traceLevels->logger->error("a node with the same name is already registered and active");
+ }
+ }
+ catch(const Ice::LocalException& ex)
{
- timedWait(_timeout);
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "failed to establish session with replica `" << _name << "':\n" << ex;
+ }
}
+ }
+ //
+ // Wait for the configured timeout duration.
+ //
+ {
+ Lock sync(*this);
+ if(!_shutdown)
+ {
+ timedWait(timeout);
+ }
if(_shutdown)
{
break;
}
- }
+ }
}
-
+
//
// Destroy the session.
//
@@ -61,27 +146,33 @@ NodeSessionKeepAliveThread::run()
try
{
session->destroy();
+
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "destroyed replica `" << _name << "' session";
+ }
}
- catch(const Ice::LocalException&)
+ catch(const Ice::LocalException& ex)
{
- //
- // TODO: XXX: TRACE?
- //
-// ostringstream os;
-// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex;
-// _node->getTraceLevels()->logger->warning(os.str());
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "couldn't destroy replica `" << _name << "' session:\n" << ex;
+ }
}
}
}
-void
+bool
NodeSessionKeepAliveThread::waitForCreate()
{
Lock sync(*this);
- while(!_session)
+ while(!_session && !_shutdown)
{
wait();
}
+ return !_shutdown;
}
void
@@ -92,100 +183,260 @@ NodeSessionKeepAliveThread::terminate()
notifyAll();
}
+NodeSessionManager::NodeSessionManager() :
+ _serial(1),
+ _destroyed(false)
+{
+}
+
void
-NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
+NodeSessionManager::create(const NodeIPtr& node)
{
- if(session)
- {
- try
- {
- session->keepAlive(_node->getPlatformInfo().getLoadInfo());
- return; // We're done!
- }
- catch(const Ice::LocalException&)
- {
- }
- }
+ assert(!_node);
+
+ const_cast<NodeIPtr&>(_node) = node;
+
+ Ice::CommunicatorPtr communicator = _node->getCommunicator();
+ assert(communicator->getDefaultLocator());
+ Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity();
- try
+ id.name = "Query";
+ _query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
+
+ id.name = "InternalRegistry";
+ _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
+
+ _thread = new Thread(*this);
+ _thread->start();
+}
+
+void
+NodeSessionManager::run()
+{
+ NodeSessionPrx session;
+ TraceLevelsPtr traceLevels = _node->getTraceLevels();
+ IceUtil::Time timeout = IceUtil::Time::seconds(5);
+ while(true)
{
- NodeSessionPrx newSession = _node->registerWithRegistry(_registry);
- int timeout = newSession->getTimeout();
+ if(session)
{
- Lock sync(*this);
- if(timeout > 0)
+ if(traceLevels && traceLevels->replica > 2)
{
- _timeout = IceUtil::Time::seconds(timeout);
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "sending keep alive message to master replica";
+ }
+
+ try
+ {
+ session->keepAlive(_node->getPlatformInfo().getLoadInfo());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "lost session with master replica:\n" << ex;
+ }
+ session = 0;
+ _node->setObserver(0);
+ {
+ Lock sync(*this);
+ _masterSession = session;
+ notifyAll();
+ }
}
- _session = newSession;
- notifyAll();
}
- }
- catch(const NodeActiveException&)
- {
- _node->getTraceLevels()->logger->error("a node with the same name is already registered and active");
- }
- catch(const Ice::LocalException&)
- {
- //
- // TODO: FIX THIS SHOULD BE A TRACE
- //
-// ostringstream os;
-// os << "couldn't contact the IceGrid registry:\n" << ex;
-// _node->getTraceLevels()->logger->warning(os.str());
- }
-}
-NodeSessionManager::NodeSessionManager()
-{
-}
+ if(!session)
+ {
+ //
+ // Establish a session with the master IceGrid registry.
+ //
+ try
+ {
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "trying to establish session with master replica";
+ }
-void
-NodeSessionManager::create(const NodeIPtr& node)
-{
- assert(!_node);
- const_cast<NodeIPtr&>(_node) = node;
+ session = _node->registerWithRegistry(_master);
- Ice::CommunicatorPtr communicator = _node->getCommunicator();
- Ice::PropertiesPtr properties = communicator->getProperties();
+ _node->setObserver(session->getObserver());
- Ice::LocatorPrx locator = communicator->getDefaultLocator();
- assert(locator);
- string instanceName = locator->ice_getIdentity().category;
+ //
+ // We only check consistency with the master registry.
+ //
+ _node->checkConsistency(session);
+
+ {
+ Lock sync(*this);
+ _masterSession = session;
+ notifyAll();
+ }
- QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/Query"));
- Ice::ObjectProxySeq proxies = query->findAllObjectsByType(InternalRegistry::ice_staticId());
- NodeSessionKeepAliveThreadPtr thread;
+ int t = session->getTimeout();
+ if(t > 0)
+ {
+ timeout = IceUtil::Time::seconds(t);
+ }
- Lock sync(*this);
- for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "established session with master replica";
+ }
+ }
+ catch(const NodeActiveException&)
+ {
+ if(traceLevels)
+ {
+ traceLevels->logger->error("a node with the same name is already registered and active");
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "failed to establish session with master replica:\n" << ex;
+ }
+ }
+
+ //
+ // Get the list of replicas (either with the master
+ // session or the IceGrid::Query interface) and make sure
+ // we have sessions opened to these replicas.
+ //
+ try
+ {
+ unsigned long serial = 0;
+ InternalRegistryPrxSeq replicas;
+ while(true)
+ {
+ {
+ Lock sync(*this);
+ if(serial == _serial)
+ {
+ _serial = 1;
+ syncReplicas(replicas);
+ break;
+ }
+ serial = _serial;
+ }
+
+ if(session)
+ {
+ replicas = session->getReplicas();
+ }
+ else
+ {
+ replicas.clear();
+ Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
+ }
+ }
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ // IGNORE
+ }
+ }
+
+ {
+ Lock sync(*this);
+ if(!_destroyed)
+ {
+ timedWait(timeout);
+ }
+ if(_destroyed)
+ {
+ break;
+ }
+ }
+ }
+
+ //
+ // Destroy the session.
+ //
+ if(session)
{
- thread = new NodeSessionKeepAliveThread(InternalRegistryPrx::uncheckedCast(*p), _node);
- thread->start();
- _sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
+ try
+ {
+ session->destroy();
+
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "destroyed master replica session";
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "couldn't destroy master replica session:\n" << ex;
+ }
+ }
}
}
void
NodeSessionManager::waitForCreate()
{
- Lock sync(*this);
- for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p)
+ //
+ // Wait for the node to establish the session with the master or
+ // at least one replica registry.
+ //
+ while(true)
{
- p->second->waitForCreate();
+ NodeSessionKeepAliveThreadPtr thread;
+ {
+ Lock sync(*this);
+ while(!_masterSession || _sessions.empty() || !_destroyed)
+ {
+ wait();
+ }
+
+ if(_masterSession || _destroyed)
+ {
+ return;
+ }
+ else
+ {
+ thread = _sessions.begin()->second;
+ }
+ }
+ if(thread->waitForCreate())
+ {
+ break;
+ }
}
}
void
NodeSessionManager::destroy()
{
- Lock sync(*this);
+ NodeSessionMap sessions;
+ {
+ Lock sync(*this);
+ _destroyed = true;
+ _sessions.swap(sessions);
+ notifyAll();
+ }
+
NodeSessionMap::const_iterator p;
- for(p = _sessions.begin(); p != _sessions.end(); ++p)
+ for(p = sessions.begin(); p != sessions.end(); ++p)
{
p->second->terminate();
}
- for(p = _sessions.begin(); p != _sessions.end(); ++p)
+
+ _thread->getThreadControl().join();
+ for(p = sessions.begin(); p != sessions.end(); ++p)
{
p->second->getThreadControl().join();
}
@@ -195,6 +446,12 @@ void
NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+
+ ++_serial;
if(_sessions.find(replica->ice_getIdentity()) != _sessions.end())
{
return;
@@ -211,6 +468,12 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
NodeSessionKeepAliveThreadPtr thread;
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+
+ --_serial;
NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity());
if(p != _sessions.end())
{
@@ -225,3 +488,41 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
}
}
+void
+NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
+{
+ NodeSessionMap sessions;
+ _sessions.swap(sessions);
+
+ NodeSessionKeepAliveThreadPtr thread;
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ if((*p)->ice_getIdentity() == _master->ice_getIdentity())
+ {
+ continue;
+ }
+ NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity());
+ if(q != sessions.end())
+ {
+ thread = q->second;
+ sessions.erase((*p)->ice_getIdentity());
+ }
+ else
+ {
+ thread = new NodeSessionKeepAliveThread(*p, _node);
+ thread->start();
+ }
+ _sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
+ }
+
+ NodeSessionMap::const_iterator q;
+ for(q = sessions.begin(); q != sessions.end(); ++q)
+ {
+ q->second->terminate();
+ }
+ for(q = sessions.begin(); q != sessions.end(); ++q)
+ {
+ q->second->getThreadControl().join();
+ }
+}
+