summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-07-27 13:20:03 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-07-27 13:20:03 +0000
commitdad0bda0d462b4730b168befd187ca43446f12e7 (patch)
tree1a964dc5a3bd2c945e0ee165ddda55c3e4281157 /cpp/src/IceGrid/NodeSessionManager.cpp
parentImproved __checkMode (diff)
downloadice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.bz2
ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.xz
ice-dad0bda0d462b4730b168befd187ca43446f12e7.zip
More IceGrid replication improvements.
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp576
1 files changed, 246 insertions, 330 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 37bc352e624..78f5f8a5427 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -17,11 +17,10 @@ using namespace std;
using namespace IceGrid;
NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) :
- _registry(registry),
- _node(node),
- _shutdown(false)
+ SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(registry),
+ _node(node)
{
- string name = _registry->ice_getIdentity().name;
+ string name = registry->ice_getIdentity().name;
const string prefix("InternalRegistry-");
string::size_type pos = name.find(prefix);
if(pos != string::npos)
@@ -29,158 +28,100 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx
name = name.substr(prefix.size());
}
const_cast<string&>(_name) = name;
- const_cast<InternalRegistryPrx&>(_registry) =
- InternalRegistryPrx::uncheckedCast(
- _registry->ice_getCommunicator()->stringToProxy(
- _registry->ice_getCommunicator()->identityToString(_registry->ice_getIdentity())));
}
-void
-NodeSessionKeepAliveThread::run()
+NodeSessionPrx
+NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const
{
- //
- // Keep alive the session.
- //
- NodeSessionPrx session;
- IceUtil::Time timeout = IceUtil::Time::seconds(5);
- TraceLevelsPtr traceLevels = _node->getTraceLevels();
- while(true)
- {
- //
- // Send a keep alive message to the session.
- //
- if(session)
+ try
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
{
- if(traceLevels && traceLevels->replica > 2)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "sending keep alive message to replica `" << _name << "'";
- }
-
- try
- {
- session->keepAlive(_node->getPlatformInfo().getLoadInfo());
- }
- catch(const Ice::LocalException& ex)
- {
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "lost session with replica `" << _name << "':\n" << ex;
- }
- session = 0;
- }
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "trying to establish session with replica `" << _name << "'";
}
- //
- // If the session isn't established yet, try to create a new
- // session.
- //
- if(!session)
- {
- try
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "trying to establish session with replica `" << _name << "'";
- }
+ NodeSessionPrx session = _node->registerWithRegistry(registry);
- session = _node->registerWithRegistry(_registry);
- {
- Lock sync(*this);
- _session = session;
- notifyAll();
- }
-
- int t = session->getTimeout();
- if(t > 0)
- {
- timeout = IceUtil::Time::seconds(t);
- }
-
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "established session with replica `" << _name << "'";
- }
- }
- catch(const NodeActiveException&)
- {
- if(traceLevels)
- {
- traceLevels->logger->error("a node with the same name is already registered and active");
- }
- }
- catch(const Ice::LocalException& ex)
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "failed to establish session with replica `" << _name << "':\n" << ex;
- }
- }
+ int t = session->getTimeout();
+ if(t > 0)
+ {
+ timeout = IceUtil::Time::seconds(t);
}
-
- //
- // Wait for the configured timeout duration.
- //
+
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
{
- Lock sync(*this);
- if(!_shutdown)
- {
- timedWait(timeout);
- }
- if(_shutdown)
- {
- break;
- }
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "established session with replica `" << _name << "'";
}
+
+ return session;
}
-
- //
- // Destroy the session.
- //
- if(session)
+ catch(const NodeActiveException&)
{
- try
+ if(_node->getTraceLevels())
{
- session->destroy();
-
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "destroyed replica `" << _name << "' session";
- }
+ _node->getTraceLevels()->logger->error("a node with the same name is already registered and active");
}
- catch(const Ice::LocalException& ex)
+ return 0;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
{
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "couldn't destroy replica `" << _name << "' session:\n" << ex;
- }
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "failed to establish session with replica `" << _name << "':\n" << ex;
}
+ return 0;
}
}
-bool
-NodeSessionKeepAliveThread::waitForCreate()
+void
+NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) const
{
- Lock sync(*this);
- while(!_session && !_shutdown)
+ try
{
- wait();
+ session->destroy();
+
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "destroyed replica `" << _name << "' session";
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "couldn't destroy replica `" << _name << "' session:\n" << ex;
+ }
}
- return !_shutdown;
}
-void
-NodeSessionKeepAliveThread::terminate()
+bool
+NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) const
{
- Lock sync(*this);
- _shutdown = true;
- notifyAll();
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "sending keep alive message to replica `" << _name << "'";
+ }
+
+ try
+ {
+ session->keepAlive(_node->getPlatformInfo().getLoadInfo());
+ return true;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "lost session with replica `" << _name << "':\n" << ex;
+ }
+ return false;
+ }
}
NodeSessionManager::NodeSessionManager() :
@@ -206,181 +147,24 @@ NodeSessionManager::create(const NodeIPtr& node)
id.name = "InternalRegistry";
_master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
- _thread = new Thread(*this);
+ _thread = new Thread(*this, _master);
_thread->start();
}
void
-NodeSessionManager::run()
+NodeSessionManager::create(const InternalRegistryPrx& replica)
{
- NodeSessionPrx session;
- TraceLevelsPtr traceLevels = _node->getTraceLevels();
- IceUtil::Time timeout = IceUtil::Time::seconds(5);
- while(true)
+ assert(_thread);
+ if(replica->ice_getIdentity() == _master->ice_getIdentity())
{
- if(session)
- {
- if(traceLevels && traceLevels->replica > 2)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "sending keep alive message to master replica";
- }
-
- try
- {
- session->keepAlive(_node->getPlatformInfo().getLoadInfo());
- }
- catch(const Ice::LocalException& ex)
- {
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "lost session with master replica:\n" << ex;
- }
- session = 0;
- _node->setObserver(0);
- {
- Lock sync(*this);
- _masterSession = session;
- notifyAll();
- }
- }
- }
-
- if(!session)
- {
- //
- // Establish a session with the master IceGrid registry.
- //
- try
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "trying to establish session with master replica";
- }
-
- session = _node->registerWithRegistry(_master);
-
- _node->setObserver(session->getObserver());
-
- //
- // We only check consistency with the master registry.
- //
- _node->checkConsistency(session);
-
- {
- Lock sync(*this);
- _masterSession = session;
- notifyAll();
- }
-
- int t = session->getTimeout();
- if(t > 0)
- {
- timeout = IceUtil::Time::seconds(t);
- }
-
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "established session with master replica";
- }
- }
- catch(const NodeActiveException&)
- {
- if(traceLevels)
- {
- traceLevels->logger->error("a node with the same name is already registered and active");
- }
- }
- catch(const Ice::LocalException& ex)
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "failed to establish session with master replica:\n" << ex;
- }
- }
-
- //
- // Get the list of replicas (either with the master
- // session or the IceGrid::Query interface) and make sure
- // we have sessions opened to these replicas.
- //
- try
- {
- unsigned long serial = 0;
- InternalRegistryPrxSeq replicas;
- while(true)
- {
- {
- Lock sync(*this);
- if(serial == _serial)
- {
- _serial = 1;
- syncReplicas(replicas);
- break;
- }
- serial = _serial;
- }
-
- if(session)
- {
- replicas = session->getReplicas();
- }
- else
- {
- replicas.clear();
- Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
- {
- replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
- }
- }
- }
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
- }
-
- {
- Lock sync(*this);
- if(!_destroyed)
- {
- timedWait(timeout);
- }
- if(_destroyed)
- {
- break;
- }
- }
+ _thread->tryCreateSession(replica);
}
-
- //
- // Destroy the session.
- //
- if(session)
+ else
{
- try
- {
- session->destroy();
-
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "destroyed master replica session";
- }
- }
- catch(const Ice::LocalException& ex)
+ NodeSessionKeepAliveThreadPtr thread = replicaAdded(replica);
+ if(thread)
{
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "couldn't destroy master replica session:\n" << ex;
- }
+ thread->tryCreateSession(replica);
}
}
}
@@ -388,34 +172,8 @@ NodeSessionManager::run()
void
NodeSessionManager::waitForCreate()
{
- //
- // Wait for the node to establish the session with the master or
- // at least one replica registry.
- //
- while(true)
- {
- NodeSessionKeepAliveThreadPtr thread;
- {
- Lock sync(*this);
- while(!_masterSession && _sessions.empty() && !_destroyed)
- {
- wait();
- }
-
- if(_masterSession || _destroyed)
- {
- return;
- }
- else
- {
- thread = _sessions.begin()->second;
- }
- }
- if(thread->waitForCreate())
- {
- break;
- }
- }
+ assert(_thread);
+ _thread->waitForCreate();
}
void
@@ -429,6 +187,7 @@ NodeSessionManager::destroy()
notifyAll();
}
+ _thread->terminate();
NodeSessionMap::const_iterator p;
for(p = sessions.begin(); p != sessions.end(); ++p)
{
@@ -442,24 +201,31 @@ NodeSessionManager::destroy()
}
}
-void
+NodeSessionKeepAliveThreadPtr
NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
{
Lock sync(*this);
if(_destroyed)
{
- return;
+ return 0;
}
++_serial;
- if(_sessions.find(replica->ice_getIdentity()) != _sessions.end())
+ NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
+ if(p != _sessions.end())
{
- return;
+ return p->second;
}
- NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node);
+ InternalRegistryPrx registry =
+ InternalRegistryPrx::uncheckedCast(
+ replica->ice_getCommunicator()->stringToProxy(
+ replica->ice_getCommunicator()->identityToString(replica->ice_getIdentity())));
+
+ NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(registry, _node);
thread->start();
_sessions.insert(make_pair(replica->ice_getIdentity(), thread));
+ return thread;
}
void
@@ -526,3 +292,153 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
}
}
+NodeSessionPrx
+NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const
+{
+ //
+ // Establish a session with the master IceGrid registry.
+ //
+ NodeSessionPrx session;
+ try
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "trying to establish session with master replica";
+ }
+
+ session = _node->registerWithRegistry(registry);
+
+ int t = session->getTimeout();
+ if(t > 0)
+ {
+ timeout = IceUtil::Time::seconds(t);
+ }
+
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "established session with master replica";
+ }
+ }
+ catch(const NodeActiveException&)
+ {
+ if(_node->getTraceLevels())
+ {
+ _node->getTraceLevels()->logger->error("a node with the same name is already registered and active");
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "failed to establish session with master replica:\n" << ex;
+ }
+ }
+
+ if(session)
+ {
+ _node->setObserver(session->getObserver());
+ _node->checkConsistency(session);
+ }
+ else
+ {
+ _node->setObserver(0);
+ }
+
+ //
+ // Get the list of replicas (either with the master session or the
+ // IceGrid::Query interface) and make sure we have sessions opened
+ // to these replicas.
+ //
+ try
+ {
+ unsigned long serial = 0;
+ InternalRegistryPrxSeq replicas;
+ while(true)
+ {
+ {
+ Lock sync(*this);
+ if(serial == _serial)
+ {
+ NodeSessionManager& self = const_cast<NodeSessionManager&>(*this);
+ self._serial = 1;
+ self.syncReplicas(replicas);
+ break;
+ }
+ serial = _serial;
+ }
+
+ if(session)
+ {
+ replicas = registry->getReplicas();
+ }
+ else
+ {
+ replicas.clear();
+ Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
+ }
+ }
+ }
+ }
+ catch(const Ice::LocalException&)
+ {
+ // IGNORE
+ }
+
+ return session;
+}
+
+bool
+NodeSessionManager::keepAlive(const NodeSessionPrx& session) const
+{
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "sending keep alive message to master replica";
+ }
+
+ try
+ {
+ session->keepAlive(_node->getPlatformInfo().getLoadInfo());
+ return true;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "lost session with master replica:\n" << ex;
+ }
+ _node->setObserver(0);
+ return false;
+ }
+}
+
+void
+NodeSessionManager::destroySession(const NodeSessionPrx& session) const
+{
+ try
+ {
+ session->destroy();
+
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "destroyed master replica session";
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "couldn't destroy master replica session:\n" << ex;
+ }
+ }
+}
+