diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 224 |
1 files changed, 115 insertions, 109 deletions
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index ab9ef518313..f90a242a430 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -10,8 +10,8 @@ using namespace std; using namespace IceGrid; -NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, - const NodeIPtr& node, +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const shared_ptr<InternalRegistryPrx>& registry, + const shared_ptr<NodeI>& node, NodeSessionManager& manager) : SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), _node(node), _manager(manager) { @@ -26,12 +26,12 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx const_cast<string&>(_replicaName) = name; } -NodeSessionPrx -NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil::Time& timeout) +shared_ptr<NodeSessionPrx> +NodeSessionKeepAliveThread::createSession(shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout) { - NodeSessionPrx session; - IceInternal::UniquePtr<Ice::Exception> exception; - TraceLevelsPtr traceLevels = _node->getTraceLevels(); + shared_ptr<NodeSessionPrx> session; + string exceptionMessage; + auto traceLevels = _node->getTraceLevels(); try { if(traceLevels && traceLevels->replica > 1) @@ -40,7 +40,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil out << "trying to establish session with replica `" << _replicaName << "'"; } - set<InternalRegistryPrx> used; + set<shared_ptr<InternalRegistryPrx>> used; if(!registry->ice_getEndpoints().empty()) { try @@ -49,34 +49,33 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil } catch(const Ice::LocalException& ex) { - exception.reset(ex.ice_clone()); + exceptionMessage = ex.what(); used.insert(registry); - registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())); + registry = Ice::uncheckedCast<InternalRegistryPrx>(registry->ice_endpoints(Ice::EndpointSeq())); } } if(!session) { - vector<Ice::AsyncResultPtr> results; - vector<QueryPrx> queryObjects = _manager.getQueryObjects(); - for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + vector<future<shared_ptr<Ice::ObjectPrx>>> results; + auto queryObjects = _manager.getQueryObjects(); + for(const auto& object : queryObjects) { - results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); + results.push_back(object->findObjectByIdAsync(registry->ice_getIdentity())); } - for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + for(auto& result : results) { - QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); if(isDestroyed()) { break; } - InternalRegistryPrx newRegistry; + shared_ptr<InternalRegistryPrx> newRegistry; try { - Ice::ObjectPrx obj = query->end_findObjectById(*p); - newRegistry = InternalRegistryPrx::uncheckedCast(obj); + auto obj = result.get(); + newRegistry = Ice::uncheckedCast<InternalRegistryPrx>(obj); if(newRegistry && used.find(newRegistry) == used.end()) { session = createSessionImpl(newRegistry, timeout); @@ -86,7 +85,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil } catch(const Ice::LocalException& ex) { - exception.reset(ex.ice_clone()); + exceptionMessage = ex.what(); if(newRegistry) { used.insert(newRegistry); @@ -102,7 +101,7 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil traceLevels->logger->error("a node with the same name is already active with the replica `" + _replicaName + "'"); } - exception.reset(ex.ice_clone()); + exceptionMessage = ex.what(); } catch(const PermissionDeniedException& ex) { @@ -110,11 +109,11 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil { traceLevels->logger->error("connection to the registry `" + _replicaName + "' was denied:\n" + ex.reason); } - exception.reset(ex.ice_clone()); + exceptionMessage = ex.what(); } catch(const Ice::Exception& ex) { - exception.reset(ex.ice_clone()); + exceptionMessage = ex.what(); } if(session) @@ -131,9 +130,9 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil { Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); out << "failed to establish session with replica `" << _replicaName << "':\n"; - if(exception.get()) + if(!exceptionMessage.empty()) { - out << *exception.get(); + out << exceptionMessage; } else { @@ -145,17 +144,19 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil return session; } -NodeSessionPrx -NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +shared_ptr<NodeSessionPrx> +NodeSessionKeepAliveThread::createSessionImpl(const shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout) { - NodeSessionPrx session; + shared_ptr<NodeSessionPrx> session; try { session = _node->registerWithRegistry(registry); - int t = session->getTimeout(); + auto t = session->getTimeout(); if(t > 0) { - timeout = IceUtil::Time::seconds(t / 2); + // Timeout is used to configure the delay to wait between two keep alives + // If we used t directly, a delayed keep alive could kill the session + timeout = chrono::seconds(t / 2); } _node->addObserver(session, session->getObserver()); return session; @@ -168,7 +169,7 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr } void -NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) +NodeSessionKeepAliveThread::destroySession(const shared_ptr<NodeSessionPrx>& session) { _node->removeObserver(session); @@ -196,7 +197,7 @@ NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) } bool -NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) +NodeSessionKeepAliveThread::keepAlive(const shared_ptr<NodeSessionPrx>& session) { if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) { @@ -221,7 +222,7 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) } } -NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) : +NodeSessionManager::NodeSessionManager(const shared_ptr<Ice::Communicator>& communicator, const string& instanceName) : SessionManager(communicator, instanceName), _destroyed(false), _activated(false) @@ -229,14 +230,13 @@ NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator, } void -NodeSessionManager::create(const NodeIPtr& node) +NodeSessionManager::create(const shared_ptr<NodeI>& node) { { - Lock sync(*this); + lock_guard lock(_mutex); assert(!_node); - const_cast<NodeIPtr&>(_node) = node; - _thread = new Thread(*this); - _thread->start(); + const_cast<shared_ptr<NodeI>&>(_node) = node; + _thread = make_shared<Thread>(*this); } // @@ -246,14 +246,14 @@ NodeSessionManager::create(const NodeIPtr& node) // before the node is activated. // _thread->tryCreateSession(); - _thread->waitTryCreateSession(IceUtil::Time::seconds(3)); + _thread->waitTryCreateSession(3s); } void -NodeSessionManager::create(const InternalRegistryPrx& replica) +NodeSessionManager::create(const shared_ptr<InternalRegistryPrx>& replica) { assert(_thread); - NodeSessionKeepAliveThreadPtr thread; + shared_ptr<NodeSessionKeepAliveThread> thread; if(replica->ice_getIdentity() == _master->ice_getIdentity()) { thread = _thread; @@ -261,7 +261,7 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) } else { - Lock sync(*this); + lock_guard lock(_mutex); thread = addReplicaSession(replica); } @@ -276,7 +276,7 @@ void NodeSessionManager::activate() { { - Lock sync(*this); + lock_guard lock(_mutex); _activated = true; } @@ -285,7 +285,7 @@ NodeSessionManager::activate() // again and make sure that the servers are synchronized and the // replica observer is set on the session. // - NodeSessionPrx session = _thread->getSession(); + auto session = _thread->getSession(); if(session) { try @@ -319,6 +319,7 @@ NodeSessionManager::terminate() { assert(_thread); _thread->terminate(); + _thread->join(); } void @@ -326,14 +327,14 @@ NodeSessionManager::destroy() { NodeSessionMap sessions; { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; } _destroyed = true; _sessions.swap(sessions); - notifyAll(); + _condVar.notify_all(); } if(_thread) @@ -341,25 +342,27 @@ NodeSessionManager::destroy() _thread->terminate(); } - for(NodeSessionMap::const_iterator p = sessions.begin(); p != sessions.end(); ++p) + for(const auto& session : sessions) { - p->second->terminate(); + session.second->terminate(); } if(_thread) { - _thread->getThreadControl().join(); + _thread->join(); } - for(NodeSessionMap::const_iterator p = sessions.begin(); p != sessions.end(); ++p) + + for(const auto& session : sessions) { - p->second->getThreadControl().join(); + session.second->join(); } + } void NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; @@ -369,17 +372,17 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) // Initialize the set of replicas known by the master. // _replicas.clear(); - for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + for(const auto& replica : replicas) { - _replicas.insert((*p)->ice_getIdentity()); - addReplicaSession(*p)->tryCreateSession(); + _replicas.insert(replica->ice_getIdentity()); + addReplicaSession(replica)->tryCreateSession(); } } void -NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) +NodeSessionManager::replicaAdded(const shared_ptr<InternalRegistryPrx>& replica) { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; @@ -389,10 +392,10 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) } void -NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +NodeSessionManager::replicaRemoved(const shared_ptr<InternalRegistryPrx>& replica) { { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; @@ -406,12 +409,12 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) // } -NodeSessionKeepAliveThreadPtr -NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) +shared_ptr<NodeSessionKeepAliveThread> +NodeSessionManager::addReplicaSession(const shared_ptr<InternalRegistryPrx>& replica) { assert(!_destroyed); - NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); - NodeSessionKeepAliveThreadPtr thread; + auto p = _sessions.find(replica->ice_getIdentity()); + shared_ptr<NodeSessionKeepAliveThread> thread; if(p != _sessions.end()) { thread = p->second; @@ -419,9 +422,8 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) } else { - thread = new NodeSessionKeepAliveThread(replica, _node, *this); + thread = make_shared<NodeSessionKeepAliveThread>(replica, _node, *this); _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); - thread->start(); } return thread; } @@ -429,15 +431,17 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) void NodeSessionManager::reapReplicas() { - vector<NodeSessionKeepAliveThreadPtr> reap; + // NodeSessionKeepAliveThread's destructor will join its thread on destruction + // Keep this vector do the destruction outside the lock + vector<shared_ptr<NodeSessionKeepAliveThread>> reap; { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; } - NodeSessionMap::iterator q = _sessions.begin(); + auto q = _sessions.begin(); while(q != _sessions.end()) { if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) @@ -453,14 +457,14 @@ NodeSessionManager::reapReplicas() } } - for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) + for(const auto& r : reap) { - (*p)->getThreadControl().join(); + r->join(); } } void -NodeSessionManager::syncServers(const NodeSessionPrx& session) +NodeSessionManager::syncServers(const shared_ptr<NodeSessionPrx>& session) { // // Ask the session to load the servers on the node. Once this is @@ -479,11 +483,11 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session) } void -NodeSessionManager::createdSession(const NodeSessionPrx& session) +NodeSessionManager::createdSession(const shared_ptr<NodeSessionPrx>& session) { bool activated; { - Lock sync(*this); + lock_guard lock(_mutex); activated = _activated; } @@ -534,9 +538,10 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } else { - vector<Ice::AsyncResultPtr> results1; - vector<Ice::AsyncResultPtr> results2; - vector<QueryPrx> queryObjects = findAllQueryObjects(false); + + vector<future<Ice::ObjectProxySeq>> results1; + vector<future<Ice::ObjectProxySeq>> results2; + auto queryObjects = findAllQueryObjects(false); // // Below we try to retrieve internal registry proxies either @@ -549,19 +554,19 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // more reliable. // - for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + for(const auto& object : queryObjects) { - results1.push_back((*q)->begin_findAllObjectsByType(InternalRegistry::ice_staticId())); + results1.push_back(object->findAllObjectsByTypeAsync(InternalRegistry::ice_staticId())); } - for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + for(const auto& object : queryObjects) { - results2.push_back((*q)->begin_findAllObjectsByType(Registry::ice_staticId())); + results2.push_back(object->findAllObjectsByTypeAsync(Registry::ice_staticId())); } - map<Ice::Identity, Ice::ObjectPrx> proxies; - for(vector<Ice::AsyncResultPtr>::const_iterator p = results1.begin(); p != results1.end(); ++p) + map<Ice::Identity, shared_ptr<Ice::ObjectPrx>> proxies; + + for(auto& result : results1) { - QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); if(isDestroyed()) { return; @@ -569,10 +574,10 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) try { - Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p); - for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) + auto prxs = result.get(); + for(const auto& prx : prxs) { - proxies[(*q)->ice_getIdentity()] = *q; + proxies[prx->ice_getIdentity()] = prx; } } catch(const Ice::LocalException&) @@ -580,9 +585,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // Ignore. } } - for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p) + for(auto& result : results2) { - QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); if(isDestroyed()) { return; @@ -590,22 +594,24 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) try { - Ice::ObjectProxySeq prxs = query->end_findAllObjectsByType(*p); - for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) + auto prxs = result.get(); + for(auto prx : prxs) { - Ice::Identity id = (*q)->ice_getIdentity(); + Ice::Identity id = prx->ice_getIdentity(); const string prefix("Registry-"); string::size_type pos = id.name.find(prefix); if(pos == string::npos) { continue; // Ignore the master registry proxy. } + id.name = "InternalRegistry-" + id.name.substr(prefix.size()); + prx = prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()); - Ice::ObjectPrx prx = (*q)->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()); id.name = "Locator"; - prx = prx->ice_locator(Ice::LocatorPrx::uncheckedCast((*q)->ice_identity(id))); - proxies[id] = prx; + prx = prx->ice_locator(Ice::uncheckedCast<Ice::LocatorPrx>(prx->ice_identity(id))); + + proxies[id] = move(prx); } } catch(const Ice::LocalException&) @@ -614,15 +620,15 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } } - for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + for(const auto& prx : proxies) { - replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + replicas.push_back(Ice::uncheckedCast<InternalRegistryPrx>(prx.second)); } } - vector<NodeSessionKeepAliveThreadPtr> sessions; + vector<shared_ptr<NodeSessionKeepAliveThread>> sessions; { - Lock sync(*this); + lock_guard lock(_mutex); if(_destroyed) { return; @@ -635,21 +641,21 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) if(!session || !_activated) { _replicas.clear(); - for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + for(const auto& replica : replicas) { - if((*p)->ice_getIdentity() != _master->ice_getIdentity()) + if(replica->ice_getIdentity() != _master->ice_getIdentity()) { - _replicas.insert((*p)->ice_getIdentity()); + _replicas.insert(replica->ice_getIdentity()); - if(_sessions.find((*p)->ice_getIdentity()) == _sessions.end()) + if(_sessions.find(replica->ice_getIdentity()) == _sessions.end()) { - NodeSessionKeepAliveThreadPtr thread = addReplicaSession(*p); + auto thread = addReplicaSession(replica); thread->tryCreateSession(); sessions.push_back(thread); } else { - addReplicaSession(*p); // Update the session + addReplicaSession(replica); // Update the session } } } @@ -661,18 +667,18 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // the replica sessions are created before the node adapter is // activated. // - IceUtil::Time before = IceUtil::Time::now(); - for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p) + auto before = chrono::system_clock::now(); + for(const auto& s : sessions) { if(isDestroyed()) { return; } - IceUtil::Time timeout = IceUtil::Time::seconds(5) - (IceUtil::Time::now() - before); - if(timeout <= IceUtil::Time()) + auto timeout = 5s - (chrono::system_clock::now() - before); + if(timeout <= 0s) { break; } - (*p)->waitTryCreateSession(timeout); + s->waitTryCreateSession(chrono::duration_cast<chrono::seconds>(timeout)); } } |