summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-12-04 19:43:10 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-12-04 19:43:10 +0000
commit996be326b98f2c5bb776ba68dd2b3d8b2861a12a (patch)
tree446bb318a97f7e64b35f299407f03868942c7c0f /cpp/src/IceGrid/NodeSessionManager.cpp
parentRemoved -single_module, it doesn't seem to be neccessary anymore (diff)
downloadice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.bz2
ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.xz
ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.zip
More cleanup
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp346
1 files changed, 194 insertions, 152 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 6ee09c7fa3c..967a15dae5c 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -237,10 +237,10 @@ NodeSessionManager::create(const NodeIPtr& node)
_thread->start();
//
- // We can't wait for the session to be created here as the node
- // adapter isn't activated yet and the registry would hang trying
- // to load the servers on the node (when createSession invokes
- // loadServers() on the session).
+ // Try to create the session. It's important that we wait for the
+ // creation of the session as this will also try to create sessions
+ // with replicas (see createdSession below) and this must be done
+ // before the node is activated.
//
_thread->tryCreateSession(true);
}
@@ -258,17 +258,23 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
}
else
{
- replicaAdded(replica, true);
+ createReplicaSession(replica, true);
}
}
void
-NodeSessionManager::activated()
+NodeSessionManager::activate()
{
{
Lock sync(*this);
_activated = true;
}
+
+ //
+ // Get the master session, if it's not created, try to create it
+ // again and make sure that the servers are synchronized and the
+ // replica observer is set on the session.
+ //
NodeSessionPrx session = _thread->getSession();
if(!session)
{
@@ -277,7 +283,14 @@ NodeSessionManager::activated()
}
if(session)
{
- syncServers(session);
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
}
@@ -321,121 +334,119 @@ NodeSessionManager::destroy()
}
void
-NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica, bool waitTryCreateSession)
+NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
{
- Lock sync(*this);
- if(_destroyed)
{
- return;
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
- ++_serial;
- NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
- NodeSessionKeepAliveThreadPtr thread;
- if(p != _sessions.end())
- {
- thread = p->second;
- thread->setRegistry(replica);
+ //
+ // Initialize the set of replicas known by the master.
+ //
+ _replicas.clear();
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ _replicas.insert((*p)->ice_getIdentity());
+ }
}
- else
+
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
- _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
- thread->start();
+ createReplicaSession(*p, false);
}
- thread->tryCreateSession(waitTryCreateSession);
}
void
-NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
+NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
{
- NodeSessionKeepAliveThreadPtr thread;
{
Lock sync(*this);
if(_destroyed)
{
return;
}
+ _replicas.insert(replica->ice_getIdentity());
+ }
+
+ createReplicaSession(replica, false);
+}
- ++_serial;
- NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity());
- if(p != _sessions.end())
+void
+NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
+{
+ {
+ Lock sync(*this);
+ if(_destroyed)
{
- thread = p->second;
- _sessions.erase(p);
+ return;
}
+ _replicas.erase(replica->ice_getIdentity());
}
- if(thread)
- {
- _node->removeObserver(thread->getSession()); // Needs to be done here because we don't destroy the session.
- thread->terminate(false); // Don't destroy the session, the replica is being shutdown!
- thread->getThreadControl().join();
- }
+
+ //
+ // We don't remove the session here. It will eventually be reaped
+ // by reapReplicas() if the session is dead.
+ //
}
void
-NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
-{
- NodeSessionMap sessions;
- _sessions.swap(sessions);
+NodeSessionManager::createReplicaSession(const InternalRegistryPrx& replica, bool waitTryCreateSession)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+ NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
NodeSessionKeepAliveThreadPtr thread;
- vector<NodeSessionKeepAliveThreadPtr> newSessions;
- for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ if(p != _sessions.end())
{
- if((*p)->ice_getIdentity() == _master->ice_getIdentity())
- {
- continue;
- }
- NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity());
- if(q != sessions.end())
- {
- thread = q->second;
- sessions.erase((*p)->ice_getIdentity());
- }
- else
- {
- thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects);
- thread->start();
- thread->tryCreateSession(false);
- newSessions.push_back(thread);
- }
- _sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
+ thread = p->second;
+ thread->setRegistry(replica);
}
+ else
+ {
+ thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
+ _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
+ thread->start();
+ }
+ thread->tryCreateSession(waitTryCreateSession);
+}
- NodeSessionMap::iterator q = sessions.begin();
- while(q != sessions.end())
+void
+NodeSessionManager::reapReplicas()
+{
+ vector<NodeSessionKeepAliveThreadPtr> reap;
{
- if(q->second->getSession()) // Don't destroy sessions which are still alive!
+ Lock sync(*this);
+ if(_destroyed)
{
- _sessions.insert(make_pair(q->first, q->second));
- sessions.erase(q++);
+ return;
}
- else
+
+ NodeSessionMap::iterator q = _sessions.begin();
+ while(q != _sessions.end())
{
- ++q;
+ if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected())
+ {
+ _node->removeObserver(q->second->getSession());
+ reap.push_back(q->second);
+ _sessions.erase(q++);
+ }
+ else
+ {
+ ++q;
+ }
}
}
- for(q = sessions.begin(); q != sessions.end(); ++q)
- {
- q->second->terminate();
- }
- for(q = sessions.begin(); q != sessions.end(); ++q)
- {
- q->second->getThreadControl().join();
- }
- //
- // If the node is being started, we wait for the new sessions to
- // be created. This ensures that once the node is activated all
- // the known replicas are connected.
- //
- if(!_activated)
+ for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p)
{
- for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t)
- {
- (*t)->tryCreateSession(true);
- }
+ (*p)->getThreadControl().join();
}
}
@@ -454,99 +465,130 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session)
// established their session with the node.
//
assert(session);
- try
- {
- session->loadServers();
- _node->checkConsistency(session);
- }
- catch(const Ice::LocalException&)
- {
- }
+ session->loadServers();
+ _node->checkConsistency(session);
}
void
NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
+ bool activated;
+ {
+ Lock sync(*this);
+ activated = _activated;
+ }
+
//
- // Get the list of replicas (either with the master session or the
- // IceGrid::Query interface) and make sure we have sessions opened
- // to these replicas.
+ // Synchronize the servers if the session is active and if the
+ // node adapter has been activated (otherwise, the servers will be
+ // synced after the node adapter activation, see activate()).
//
- try
+ // We also set the replica observer to receive notifications of
+ // replica addition/removal.
+ //
+ if(session && activated)
+ {
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ return;
+ }
+
+ //
+ // If there's no master session or if the node adapter isn't
+ // activated yet, we retrieve a list of the replicas either from
+ // the master or from the known replicas (the ones configured with
+ // Ice.Default.Locator) and we try to establish connections to
+ // each of the replicas.
+ //
+
+ InternalRegistryPrxSeq replicas;
+ if(session)
{
- unsigned long serial = 0;
- InternalRegistryPrxSeq replicas;
- while(true)
+ assert(!activated); // The node adapter isn't activated yet so
+ // we're not subscribed yet to the replica
+ // observer topic.
+ try
{
+ replicas = _thread->getRegistry()->getReplicas();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ else
+ {
+ map<Ice::Identity, Ice::ObjectPrx> proxies;
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ try
{
- Lock sync(*this);
- if(serial == _serial)
+ Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
{
- NodeSessionManager& self = const_cast<NodeSessionManager&>(*this);
- self._serial = 1;
- self.syncReplicas(replicas);
- break;
+ //
+ // NOTE: We might override a good proxy here! We could improve this to make
+ // sure that we don't override the proxy for replica N if that proxy was
+ // obtained from replica N.
+ //
+ proxies[(*q)->ice_getIdentity()] = *q;
}
- serial = _serial;
}
-
- if(session)
+ catch(const Ice::LocalException&)
{
- replicas = _thread->getRegistry()->getReplicas();
+ // IGNORE
}
- else
+ }
+
+ for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ }
+ }
+
+ {
+ Lock sync(*this);
+
+ //
+ // If the node adapter was activated since we last check, we don't need
+ // to initialize the replicas here, it will be done by replicaInit().
+ //
+ if(!session || !_activated)
+ {
+ _replicas.clear();
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- replicas.clear();
- map<Ice::Identity, Ice::ObjectPrx> proxies;
- for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
- {
- try
- {
- Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
- {
- //
- // NOTE: We might override a good proxy
- // here! We could improve this to make
- // sure that we don't override the proxy
- // for replica N if that proxy was
- // obtained from replica N.
- //
- proxies[(*q)->ice_getIdentity()] = *q;
- }
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
- }
- for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ if((*p)->ice_getIdentity() != _master->ice_getIdentity())
{
- replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ _replicas.insert((*p)->ice_getIdentity());
}
}
}
}
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
-
+
//
- // Synchronize the servers if the session is active and if the
- // node adapter has been activated (otherwise, the servers will be
- // synced after the node adapter activation, see activated()).
+ // Create the replica sessions and wait for the creation. It's
+ // important to wait to ensure that the replica sessions are
+ // created before the node adapter is activated.
//
- if(session)
+ InternalRegistryPrxSeq::const_iterator t;
+ for(t = replicas.begin(); t != replicas.end(); ++t)
{
- bool activated;
+ if((*t)->ice_getIdentity() != _master->ice_getIdentity())
{
- Lock sync(*this);
- activated = _activated;
+ createReplicaSession(*t, false);
}
- if(activated)
+ }
+ for(t = replicas.begin(); t != replicas.end(); ++t)
+ {
+ if((*t)->ice_getIdentity() != _master->ice_getIdentity())
{
- syncServers(session);
+ createReplicaSession(*t, true);
}
}
}