summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
committerBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
commitabada90e3f84dc703b8ddc9efcbed8a946fadead (patch)
tree2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/IceGrid/NodeSessionManager.cpp
parentremoving trace message (diff)
downloadice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp458
1 files changed, 229 insertions, 229 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index ff522a1a58e..f723134a0a5 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -17,8 +17,8 @@ using namespace std;
using namespace IceGrid;
NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
- const NodeIPtr& node,
- const vector<QueryPrx>& queryObjects) :
+ const NodeIPtr& node,
+ const vector<QueryPrx>& queryObjects) :
SessionKeepAliveThread<NodeSessionPrx>(registry),
_node(node),
_queryObjects(queryObjects)
@@ -29,7 +29,7 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx
string::size_type pos = name.find(prefix);
if(pos != string::npos)
{
- name = name.substr(prefix.size());
+ name = name.substr(prefix.size());
}
const_cast<string&>(_name) = name;
}
@@ -42,90 +42,90 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
TraceLevelsPtr traceLevels = _node->getTraceLevels();
try
{
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "trying to establish session with replica `" << _name << "'";
- }
-
- set<InternalRegistryPrx> used;
- if(!registry->ice_getEndpoints().empty())
- {
- try
- {
- session = createSessionImpl(registry, timeout);
- }
- catch(const Ice::LocalException& ex)
- {
- exception.reset(ex.ice_clone());
- used.insert(registry);
- registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()));
- }
- }
-
- if(!session)
- {
- for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
- {
- InternalRegistryPrx newRegistry;
- try
- {
- Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity());
- newRegistry = InternalRegistryPrx::uncheckedCast(obj);
- if(newRegistry && used.find(newRegistry) == used.end())
- {
- session = createSessionImpl(newRegistry, timeout);
- registry = newRegistry;
- break;
- }
- }
- catch(const Ice::LocalException& ex)
- {
- exception.reset(ex.ice_clone());
- if(newRegistry)
- {
- used.insert(newRegistry);
- }
- }
- }
- }
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "trying to establish session with replica `" << _name << "'";
+ }
+
+ set<InternalRegistryPrx> used;
+ if(!registry->ice_getEndpoints().empty())
+ {
+ try
+ {
+ session = createSessionImpl(registry, timeout);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ used.insert(registry);
+ registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()));
+ }
+ }
+
+ if(!session)
+ {
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ InternalRegistryPrx newRegistry;
+ try
+ {
+ Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity());
+ newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ if(newRegistry && used.find(newRegistry) == used.end())
+ {
+ session = createSessionImpl(newRegistry, timeout);
+ registry = newRegistry;
+ break;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ if(newRegistry)
+ {
+ used.insert(newRegistry);
+ }
+ }
+ }
+ }
}
catch(const NodeActiveException& ex)
{
- if(traceLevels)
- {
- traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'");
- }
- exception.reset(ex.ice_clone());
+ if(traceLevels)
+ {
+ traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'");
+ }
+ exception.reset(ex.ice_clone());
}
catch(const Ice::Exception& ex)
{
- exception.reset(ex.ice_clone());
+ exception.reset(ex.ice_clone());
}
if(session)
{
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "established session with replica `" << _name << "'";
- }
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "established session with replica `" << _name << "'";
+ }
}
else
{
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "failed to establish session with replica `" << _name << "':\n";
- if(exception.get())
- {
- out << *exception.get();
- }
- else
- {
- out << "failed to get replica proxy";
- }
- }
+ if(traceLevels && traceLevels->replica > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "failed to establish session with replica `" << _name << "':\n";
+ if(exception.get())
+ {
+ out << *exception.get();
+ }
+ else
+ {
+ out << "failed to get replica proxy";
+ }
+ }
}
return session;
@@ -138,7 +138,7 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr
int t = session->getTimeout();
if(t > 0)
{
- timeout = IceUtil::Time::seconds(t / 2);
+ timeout = IceUtil::Time::seconds(t / 2);
}
_node->addObserver(session, session->getObserver());
return session;
@@ -151,21 +151,21 @@ NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session)
try
{
- session->destroy();
+ session->destroy();
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "destroyed replica `" << _name << "' session";
- }
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "destroyed replica `" << _name << "' session";
+ }
}
catch(const Ice::LocalException& ex)
{
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "couldn't destroy replica `" << _name << "' session:\n" << ex;
- }
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "couldn't destroy replica `" << _name << "' session:\n" << ex;
+ }
}
}
@@ -174,24 +174,24 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
{
if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2)
{
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "sending keep alive message to replica `" << _name << "'";
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "sending keep alive message to replica `" << _name << "'";
}
try
{
- session->keepAlive(_node->getPlatformInfo().getLoadInfo());
- return true;
+ session->keepAlive(_node->getPlatformInfo().getLoadInfo());
+ return true;
}
catch(const Ice::LocalException& ex)
{
- _node->removeObserver(session);
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "lost session with replica `" << _name << "':\n" << ex;
- }
- return false;
+ _node->removeObserver(session);
+ if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
+ {
+ Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
+ out << "lost session with replica `" << _name << "':\n" << ex;
+ }
+ return false;
}
}
@@ -225,9 +225,9 @@ NodeSessionManager::create(const NodeIPtr& node)
QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
- Ice::EndpointSeq singleEndpoint;
- singleEndpoint.push_back(*p);
- _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
+ Ice::EndpointSeq singleEndpoint;
+ singleEndpoint.push_back(*p);
+ _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
}
id.name = "InternalRegistry-Master";
@@ -252,17 +252,17 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
NodeSessionKeepAliveThreadPtr thread;
if(replica->ice_getIdentity() == _master->ice_getIdentity())
{
- thread = _thread;
- thread->setRegistry(replica);
+ thread = _thread;
+ thread->setRegistry(replica);
}
else
{
- thread = addReplicaSession(replica);
+ thread = addReplicaSession(replica);
}
if(thread)
{
- thread->tryCreateSession();
+ thread->tryCreateSession();
}
}
@@ -270,8 +270,8 @@ void
NodeSessionManager::activate()
{
{
- Lock sync(*this);
- _activated = true;
+ Lock sync(*this);
+ _activated = true;
}
//
@@ -282,19 +282,19 @@ NodeSessionManager::activate()
NodeSessionPrx session = _thread->getSession();
if(!session)
{
- _thread->tryCreateSession(true);
- session = _thread->getSession();
+ _thread->tryCreateSession(true);
+ session = _thread->getSession();
}
if(session)
{
- try
- {
- session->setReplicaObserver(_node->getProxy());
- syncServers(session);
- }
- catch(const Ice::LocalException&)
- {
- }
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
}
@@ -317,23 +317,23 @@ NodeSessionManager::destroy()
{
NodeSessionMap sessions;
{
- Lock sync(*this);
- _destroyed = true;
- _sessions.swap(sessions);
- notifyAll();
+ Lock sync(*this);
+ _destroyed = true;
+ _sessions.swap(sessions);
+ notifyAll();
}
_thread->terminate();
NodeSessionMap::const_iterator p;
for(p = sessions.begin(); p != sessions.end(); ++p)
{
- p->second->terminate();
+ p->second->terminate();
}
_thread->getThreadControl().join();
for(p = sessions.begin(); p != sessions.end(); ++p)
{
- p->second->getThreadControl().join();
+ p->second->getThreadControl().join();
}
}
@@ -343,7 +343,7 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
//
@@ -352,8 +352,8 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
_replicas.clear();
for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- _replicas.insert((*p)->ice_getIdentity());
- addReplicaSession(*p)->tryCreateSession(false);
+ _replicas.insert((*p)->ice_getIdentity());
+ addReplicaSession(*p)->tryCreateSession(false);
}
}
@@ -363,7 +363,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
_replicas.insert(replica->ice_getIdentity());
addReplicaSession(replica)->tryCreateSession(false);
@@ -373,12 +373,12 @@ void
NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
{
{
- Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
- _replicas.erase(replica->ice_getIdentity());
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+ _replicas.erase(replica->ice_getIdentity());
}
//
@@ -396,14 +396,14 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
NodeSessionKeepAliveThreadPtr thread;
if(p != _sessions.end())
{
- thread = p->second;
- thread->setRegistry(replica);
+ thread = p->second;
+ thread->setRegistry(replica);
}
else
{
- thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
- _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
- thread->start();
+ thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
+ _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
+ thread->start();
}
return thread;
}
@@ -413,31 +413,31 @@ NodeSessionManager::reapReplicas()
{
vector<NodeSessionKeepAliveThreadPtr> reap;
{
- Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
-
- NodeSessionMap::iterator q = _sessions.begin();
- while(q != _sessions.end())
- {
- if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected())
- {
- _node->removeObserver(q->second->getSession());
- reap.push_back(q->second);
- _sessions.erase(q++);
- }
- else
- {
- ++q;
- }
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+
+ NodeSessionMap::iterator q = _sessions.begin();
+ while(q != _sessions.end())
+ {
+ if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected())
+ {
+ _node->removeObserver(q->second->getSession());
+ reap.push_back(q->second);
+ _sessions.erase(q++);
+ }
+ else
+ {
+ ++q;
+ }
+ }
}
for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p)
{
- (*p)->getThreadControl().join();
+ (*p)->getThreadControl().join();
}
}
@@ -465,8 +465,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
bool activated;
{
- Lock sync(*this);
- activated = _activated;
+ Lock sync(*this);
+ activated = _activated;
}
//
@@ -479,15 +479,15 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
//
if(session && activated)
{
- try
- {
- session->setReplicaObserver(_node->getProxy());
- syncServers(session);
- }
- catch(const Ice::LocalException&)
- {
- }
- return;
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ return;
}
//
@@ -501,73 +501,73 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
InternalRegistryPrxSeq replicas;
if(session)
{
- assert(!activated); // The node adapter isn't activated yet so
- // we're not subscribed yet to the replica
+ assert(!activated); // The node adapter isn't activated yet so
+ // we're not subscribed yet to the replica
// observer topic.
- try
- {
- replicas = _thread->getRegistry()->getReplicas();
- }
- catch(const Ice::LocalException&)
- {
- }
+ try
+ {
+ replicas = _thread->getRegistry()->getReplicas();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
else
{
- map<Ice::Identity, Ice::ObjectPrx> proxies;
- for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
- {
- try
- {
- Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
- {
- //
- // NOTE: We might override a good proxy here! We could improve this to make
- // sure that we don't override the proxy for replica N if that proxy was
- // obtained from replica N.
- //
- proxies[(*q)->ice_getIdentity()] = *q;
- }
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
- }
-
- for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
- {
- replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
- }
+ map<Ice::Identity, Ice::ObjectPrx> proxies;
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ try
+ {
+ Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
+ {
+ //
+ // NOTE: We might override a good proxy here! We could improve this to make
+ // sure that we don't override the proxy for replica N if that proxy was
+ // obtained from replica N.
+ //
+ proxies[(*q)->ice_getIdentity()] = *q;
+ }
+ }
+ catch(const Ice::LocalException&)
+ {
+ // IGNORE
+ }
+ }
+
+ for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ }
}
vector<NodeSessionKeepAliveThreadPtr> sessions;
{
- Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
-
- //
- // If the node adapter was activated since we last check, we don't need
- // to initialize the replicas here, it will be done by replicaInit().
- //
- if(!session || !_activated)
- {
- _replicas.clear();
- for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
- {
- if((*p)->ice_getIdentity() != _master->ice_getIdentity())
- {
- _replicas.insert((*p)->ice_getIdentity());
- NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p);
- session->tryCreateSession(false);
- sessions.push_back(session);
- }
- }
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+
+ //
+ // If the node adapter was activated since we last check, we don't need
+ // to initialize the replicas here, it will be done by replicaInit().
+ //
+ if(!session || !_activated)
+ {
+ _replicas.clear();
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ if((*p)->ice_getIdentity() != _master->ice_getIdentity())
+ {
+ _replicas.insert((*p)->ice_getIdentity());
+ NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p);
+ session->tryCreateSession(false);
+ sessions.push_back(session);
+ }
+ }
+ }
}
//
@@ -577,7 +577,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
//
for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p)
{
- (*p)->tryCreateSession(true);
+ (*p)->tryCreateSession(true);
}
}