summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeSessionManager.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2021-01-28 16:26:44 -0500
committerJoe George <joe@zeroc.com>2021-02-01 16:59:30 -0500
commit92a6531e409f2691d82591e185a92299d415fc0f (patch)
tree60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/NodeSessionManager.cpp
parentPort Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff)
downloadice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2
ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz
ice-92a6531e409f2691d82591e185a92299d415fc0f.zip
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp224
1 files changed, 115 insertions, 109 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index ab9ef518313..f90a242a430 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -10,8 +10,8 @@
using namespace std;
using namespace IceGrid;
-NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
- const NodeIPtr& node,
+NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const shared_ptr<InternalRegistryPrx>& registry,
+ const shared_ptr<NodeI>& node,
NodeSessionManager& manager) :
SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), _node(node), _manager(manager)
{
@@ -26,12 +26,12 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx
const_cast<string&>(_replicaName) = name;
}
-NodeSessionPrx
-NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil::Time& timeout)
+shared_ptr<NodeSessionPrx>
+NodeSessionKeepAliveThread::createSession(shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout)
{
- NodeSessionPrx session;
- IceInternal::UniquePtr<Ice::Exception> exception;
- TraceLevelsPtr traceLevels = _node->getTraceLevels();
+ shared_ptr<NodeSessionPrx> session;
+ string exceptionMessage;
+ auto traceLevels = _node->getTraceLevels();
try
{
if(traceLevels && traceLevels->replica > 1)
@@ -40,7 +40,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
out << "trying to establish session with replica `" << _replicaName << "'";
}
- set<InternalRegistryPrx> used;
+ set<shared_ptr<InternalRegistryPrx>> used;
if(!registry->ice_getEndpoints().empty())
{
try
@@ -49,34 +49,33 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
}
catch(const Ice::LocalException& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMessage = ex.what();
used.insert(registry);
- registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()));
+ registry = Ice::uncheckedCast<InternalRegistryPrx>(registry->ice_endpoints(Ice::EndpointSeq()));
}
}
if(!session)
{
- vector<Ice::AsyncResultPtr> results;
- vector<QueryPrx> queryObjects = _manager.getQueryObjects();
- for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ vector<future<shared_ptr<Ice::ObjectPrx>>> results;
+ auto queryObjects = _manager.getQueryObjects();
+ for(const auto& object : queryObjects)
{
- results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity()));
+ results.push_back(object->findObjectByIdAsync(registry->ice_getIdentity()));
}
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ for(auto& result : results)
{
- QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
if(isDestroyed())
{
break;
}
- InternalRegistryPrx newRegistry;
+ shared_ptr<InternalRegistryPrx> newRegistry;
try
{
- Ice::ObjectPrx obj = query->end_findObjectById(*p);
- newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ auto obj = result.get();
+ newRegistry = Ice::uncheckedCast<InternalRegistryPrx>(obj);
if(newRegistry && used.find(newRegistry) == used.end())
{
session = createSessionImpl(newRegistry, timeout);
@@ -86,7 +85,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
}
catch(const Ice::LocalException& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMessage = ex.what();
if(newRegistry)
{
used.insert(newRegistry);
@@ -102,7 +101,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
traceLevels->logger->error("a node with the same name is already active with the replica `" +
_replicaName + "'");
}
- exception.reset(ex.ice_clone());
+ exceptionMessage = ex.what();
}
catch(const PermissionDeniedException& ex)
{
@@ -110,11 +109,11 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
{
traceLevels->logger->error("connection to the registry `" + _replicaName + "' was denied:\n" + ex.reason);
}
- exception.reset(ex.ice_clone());
+ exceptionMessage = ex.what();
}
catch(const Ice::Exception& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMessage = ex.what();
}
if(session)
@@ -131,9 +130,9 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
{
Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
out << "failed to establish session with replica `" << _replicaName << "':\n";
- if(exception.get())
+ if(!exceptionMessage.empty())
{
- out << *exception.get();
+ out << exceptionMessage;
}
else
{
@@ -145,17 +144,19 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
return session;
}
-NodeSessionPrx
-NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+shared_ptr<NodeSessionPrx>
+NodeSessionKeepAliveThread::createSessionImpl(const shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout)
{
- NodeSessionPrx session;
+ shared_ptr<NodeSessionPrx> session;
try
{
session = _node->registerWithRegistry(registry);
- int t = session->getTimeout();
+ auto t = session->getTimeout();
if(t > 0)
{
- timeout = IceUtil::Time::seconds(t / 2);
+ // Timeout is used to configure the delay to wait between two keep alives
+ // If we used t directly, a delayed keep alive could kill the session
+ timeout = chrono::seconds(t / 2);
}
_node->addObserver(session, session->getObserver());
return session;
@@ -168,7 +169,7 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr
}
void
-NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session)
+NodeSessionKeepAliveThread::destroySession(const shared_ptr<NodeSessionPrx>& session)
{
_node->removeObserver(session);
@@ -196,7 +197,7 @@ NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session)
}
bool
-NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
+NodeSessionKeepAliveThread::keepAlive(const shared_ptr<NodeSessionPrx>& session)
{
if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2)
{
@@ -221,7 +222,7 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
}
}
-NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) :
+NodeSessionManager::NodeSessionManager(const shared_ptr<Ice::Communicator>& communicator, const string& instanceName) :
SessionManager(communicator, instanceName),
_destroyed(false),
_activated(false)
@@ -229,14 +230,13 @@ NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator,
}
void
-NodeSessionManager::create(const NodeIPtr& node)
+NodeSessionManager::create(const shared_ptr<NodeI>& node)
{
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(!_node);
- const_cast<NodeIPtr&>(_node) = node;
- _thread = new Thread(*this);
- _thread->start();
+ const_cast<shared_ptr<NodeI>&>(_node) = node;
+ _thread = make_shared<Thread>(*this);
}
//
@@ -246,14 +246,14 @@ NodeSessionManager::create(const NodeIPtr& node)
// before the node is activated.
//
_thread->tryCreateSession();
- _thread->waitTryCreateSession(IceUtil::Time::seconds(3));
+ _thread->waitTryCreateSession(3s);
}
void
-NodeSessionManager::create(const InternalRegistryPrx& replica)
+NodeSessionManager::create(const shared_ptr<InternalRegistryPrx>& replica)
{
assert(_thread);
- NodeSessionKeepAliveThreadPtr thread;
+ shared_ptr<NodeSessionKeepAliveThread> thread;
if(replica->ice_getIdentity() == _master->ice_getIdentity())
{
thread = _thread;
@@ -261,7 +261,7 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
}
else
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
thread = addReplicaSession(replica);
}
@@ -276,7 +276,7 @@ void
NodeSessionManager::activate()
{
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_activated = true;
}
@@ -285,7 +285,7 @@ NodeSessionManager::activate()
// again and make sure that the servers are synchronized and the
// replica observer is set on the session.
//
- NodeSessionPrx session = _thread->getSession();
+ auto session = _thread->getSession();
if(session)
{
try
@@ -319,6 +319,7 @@ NodeSessionManager::terminate()
{
assert(_thread);
_thread->terminate();
+ _thread->join();
}
void
@@ -326,14 +327,14 @@ NodeSessionManager::destroy()
{
NodeSessionMap sessions;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
}
_destroyed = true;
_sessions.swap(sessions);
- notifyAll();
+ _condVar.notify_all();
}
if(_thread)
@@ -341,25 +342,27 @@ NodeSessionManager::destroy()
_thread->terminate();
}
- for(NodeSessionMap::const_iterator p = sessions.begin(); p != sessions.end(); ++p)
+ for(const auto& session : sessions)
{
- p->second->terminate();
+ session.second->terminate();
}
if(_thread)
{
- _thread->getThreadControl().join();
+ _thread->join();
}
- for(NodeSessionMap::const_iterator p = sessions.begin(); p != sessions.end(); ++p)
+
+ for(const auto& session : sessions)
{
- p->second->getThreadControl().join();
+ session.second->join();
}
+
}
void
NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
@@ -369,17 +372,17 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
// Initialize the set of replicas known by the master.
//
_replicas.clear();
- for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- _replicas.insert((*p)->ice_getIdentity());
- addReplicaSession(*p)->tryCreateSession();
+ _replicas.insert(replica->ice_getIdentity());
+ addReplicaSession(replica)->tryCreateSession();
}
}
void
-NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
+NodeSessionManager::replicaAdded(const shared_ptr<InternalRegistryPrx>& replica)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
@@ -389,10 +392,10 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
}
void
-NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
+NodeSessionManager::replicaRemoved(const shared_ptr<InternalRegistryPrx>& replica)
{
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
@@ -406,12 +409,12 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
//
}
-NodeSessionKeepAliveThreadPtr
-NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
+shared_ptr<NodeSessionKeepAliveThread>
+NodeSessionManager::addReplicaSession(const shared_ptr<InternalRegistryPrx>& replica)
{
assert(!_destroyed);
- NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
- NodeSessionKeepAliveThreadPtr thread;
+ auto p = _sessions.find(replica->ice_getIdentity());
+ shared_ptr<NodeSessionKeepAliveThread> thread;
if(p != _sessions.end())
{
thread = p->second;
@@ -419,9 +422,8 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
}
else
{
- thread = new NodeSessionKeepAliveThread(replica, _node, *this);
+ thread = make_shared<NodeSessionKeepAliveThread>(replica, _node, *this);
_sessions.insert(make_pair(replica->ice_getIdentity(), thread));
- thread->start();
}
return thread;
}
@@ -429,15 +431,17 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
void
NodeSessionManager::reapReplicas()
{
- vector<NodeSessionKeepAliveThreadPtr> reap;
+ // NodeSessionKeepAliveThread's destructor will join its thread on destruction
+ // Keep this vector do the destruction outside the lock
+ vector<shared_ptr<NodeSessionKeepAliveThread>> reap;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
}
- NodeSessionMap::iterator q = _sessions.begin();
+ auto q = _sessions.begin();
while(q != _sessions.end())
{
if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected())
@@ -453,14 +457,14 @@ NodeSessionManager::reapReplicas()
}
}
- for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p)
+ for(const auto& r : reap)
{
- (*p)->getThreadControl().join();
+ r->join();
}
}
void
-NodeSessionManager::syncServers(const NodeSessionPrx& session)
+NodeSessionManager::syncServers(const shared_ptr<NodeSessionPrx>& session)
{
//
// Ask the session to load the servers on the node. Once this is
@@ -479,11 +483,11 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session)
}
void
-NodeSessionManager::createdSession(const NodeSessionPrx& session)
+NodeSessionManager::createdSession(const shared_ptr<NodeSessionPrx>& session)
{
bool activated;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
activated = _activated;
}
@@ -534,9 +538,10 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
else
{
- vector<Ice::AsyncResultPtr> results1;
- vector<Ice::AsyncResultPtr> results2;
- vector<QueryPrx> queryObjects = findAllQueryObjects(false);
+
+ vector<future<Ice::ObjectProxySeq>> results1;
+ vector<future<Ice::ObjectProxySeq>> results2;
+ auto queryObjects = findAllQueryObjects(false);
//
// Below we try to retrieve internal registry proxies either
@@ -549,19 +554,19 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
// more reliable.
//
- for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ for(const auto& object : queryObjects)
{
- results1.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId()));
+ results1.push_back(object->findAllObjectsByTypeAsync(InternalRegistry::ice_staticId()));
}
- for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ for(const auto& object : queryObjects)
{
- results2.push_back((*q)->begin_findAllObjectsByType(Registry::ice_staticId()));
+ results2.push_back(object->findAllObjectsByTypeAsync(Registry::ice_staticId()));
}
- map<Ice::Identity, Ice::ObjectPrx> proxies;
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results1.begin(); p != results1.end(); ++p)
+ map<Ice::Identity, shared_ptr<Ice::ObjectPrx>> proxies;
+
+ for(auto& result : results1)
{
- QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
if(isDestroyed())
{
return;
@@ -569,10 +574,10 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
try
{
- Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p);
- for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
+ auto prxs = result.get();
+ for(const auto& prx : prxs)
{
- proxies[(*q)->ice_getIdentity()] = *q;
+ proxies[prx->ice_getIdentity()] = prx;
}
}
catch(const Ice::LocalException&)
@@ -580,9 +585,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
// Ignore.
}
}
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p)
+ for(auto& result : results2)
{
- QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
if(isDestroyed())
{
return;
@@ -590,22 +594,24 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
try
{
- Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p);
- for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
+ auto prxs = result.get();
+ for(auto prx : prxs)
{
- Ice::Identity id = (*q)->ice_getIdentity();
+ Ice::Identity id = prx->ice_getIdentity();
const string prefix("Registry-");
string::size_type pos = id.name.find(prefix);
if(pos == string::npos)
{
continue; // Ignore the master registry proxy.
}
+
id.name = "InternalRegistry-" + id.name.substr(prefix.size());
+ prx = prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq());
- Ice::ObjectPrx prx = (*q)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq());
id.name = "Locator";
- prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*q)->ice_identity(id)));
- proxies[id] = prx;
+ prx = prx->ice_locator(Ice::uncheckedCast<Ice::LocatorPrx>(prx->ice_identity(id)));
+
+ proxies[id] = move(prx);
}
}
catch(const Ice::LocalException&)
@@ -614,15 +620,15 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
}
- for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ for(const auto& prx : proxies)
{
- replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ replicas.push_back(Ice::uncheckedCast<InternalRegistryPrx>(prx.second));
}
}
- vector<NodeSessionKeepAliveThreadPtr> sessions;
+ vector<shared_ptr<NodeSessionKeepAliveThread>> sessions;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_destroyed)
{
return;
@@ -635,21 +641,21 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
if(!session || !_activated)
{
_replicas.clear();
- for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ for(const auto& replica : replicas)
{
- if((*p)->ice_getIdentity() != _master->ice_getIdentity())
+ if(replica->ice_getIdentity() != _master->ice_getIdentity())
{
- _replicas.insert((*p)->ice_getIdentity());
+ _replicas.insert(replica->ice_getIdentity());
- if(_sessions.find((*p)->ice_getIdentity()) == _sessions.end())
+ if(_sessions.find(replica->ice_getIdentity()) == _sessions.end())
{
- NodeSessionKeepAliveThreadPtr thread = addReplicaSession(*p);
+ auto thread = addReplicaSession(replica);
thread->tryCreateSession();
sessions.push_back(thread);
}
else
{
- addReplicaSession(*p); // Update the session
+ addReplicaSession(replica); // Update the session
}
}
}
@@ -661,18 +667,18 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
// the replica sessions are created before the node adapter is
// activated.
//
- IceUtil::Time before = IceUtil::Time::now();
- for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p)
+ auto before = chrono::system_clock::now();
+ for(const auto& s : sessions)
{
if(isDestroyed())
{
return;
}
- IceUtil::Time timeout = IceUtil::Time::seconds(5) - (IceUtil::Time::now() - before);
- if(timeout <= IceUtil::Time())
+ auto timeout = 5s - (chrono::system_clock::now() - before);
+ if(timeout <= 0s)
{
break;
}
- (*p)->waitTryCreateSession(timeout);
+ s->waitTryCreateSession(chrono::duration_cast<chrono::seconds>(timeout));
}
}