diff options
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 574 |
1 files changed, 242 insertions, 332 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index ae21fa49175..e6da5a12384 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -8,153 +8,26 @@ #include <IceGrid/WellKnownObjectsManager.h> #include <IceGrid/SessionI.h> #include <IceGrid/Util.h> +#include <IceGrid/Internal.h> using namespace std; +using namespace std::chrono; using namespace IceGrid; -namespace IceGrid +namespace { -// -// Callback from asynchronous call to adapter->getDirectProxy() invoked in LocatorI::findAdapterById_async(). -// -class AdapterGetDirectProxyCallback : public virtual IceUtil::Shared -{ -public: - - AdapterGetDirectProxyCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : - _locator(locator), _adapter(adapter) - { - } - - virtual void response(const ::Ice::ObjectPrx& obj) - { - assert(obj); - _locator->getDirectProxyResponse(_adapter, obj); - } - - virtual void exception(const ::Ice::Exception& e) - { - _locator->getDirectProxyException(_adapter, e); - } - -private: - - const LocatorIPtr _locator; - const LocatorAdapterInfo _adapter; -}; - -class AdapterActivateCallback : public virtual IceUtil::Shared -{ -public: - - AdapterActivateCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : - _locator(locator), _adapter(adapter) - { - } - - virtual void response(const ::Ice::ObjectPrx& obj) - { - _locator->getDirectProxyResponse(_adapter, obj); - } - - virtual void exception(const ::Ice::Exception& ex) - { - _locator->getDirectProxyException(_adapter, ex); - } - -private: - - const LocatorIPtr _locator; - const LocatorAdapterInfo _adapter; -}; - -// -// Callback from asynchrnous call to LocatorI::findAdapterById_async() -// invoked in LocatorI::findObjectById_async(). -// -class AMD_Locator_findAdapterByIdI : public Ice::AMD_Locator_findAdapterById +class AdapterRequest final : public LocatorI::Request { public: - AMD_Locator_findAdapterByIdI(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::ObjectPrx& obj) : - _cb(cb), - _obj(obj) - { - } - - virtual void ice_response(const ::Ice::ObjectPrx& obj) - { - // - // If the adapter dummy direct proxy is not null, return a - // proxy containing the identity we were looking for and the - // endpoints of the adapter. - // - // If null, return the proxy registered with the object - // registry. - // - if(obj) - { - _cb->ice_response(obj->ice_identity(_obj->ice_getIdentity())); - } - else - { - _cb->ice_response(_obj); - } - } - - virtual void ice_exception(const ::Ice::Exception& ex) - { - try - { - ex.ice_throw(); - } - catch(const Ice::AdapterNotFoundException&) - { - // - // We couldn't find the adapter, we ignore and return the - // original proxy containing the adapter id. - // - _cb->ice_response(_obj); - return; - } - catch(const Ice::Exception& e) - { - // - // Rethrow unexpected exception. - // - _cb->ice_exception(e); - return; - } - - assert(false); - } - - virtual void ice_exception(const std::exception& ex) - { - _cb->ice_exception(ex); - } - - virtual void ice_exception() - { - _cb->ice_exception(); - } - -private: - - const Ice::AMD_Locator_findObjectByIdPtr _cb; - const Ice::ObjectPrx _obj; -}; - -class AdapterRequest : public LocatorI::Request -{ -public: - - AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, + AdapterRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)> exception, + const shared_ptr<LocatorI>& locator, const Ice::EncodingVersion& encoding, const LocatorAdapterInfo& adapter) : - _amdCB(amdCB), + _response(move(response)), + _exception(move(exception)), _locator(locator), _encoding(encoding), _adapter(adapter), @@ -163,20 +36,20 @@ public: assert(_adapter.proxy); } - virtual void - execute() + void + execute() override { - _locator->getDirectProxy(_adapter, this); + _locator->getDirectProxy(_adapter, shared_from_this()); } - virtual void - activating(const string&) + void + activating(const string&) override { // Nothing to do. } - virtual void - response(const std::string& id, const Ice::ObjectPrx& proxy) + void + response(const std::string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override { assert(proxy); @@ -185,49 +58,52 @@ public: // if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion())) { - exception(id, Ice::UnsupportedEncodingException(__FILE__, - __LINE__, - "server doesn't support requested encoding", - _encoding, - proxy->ice_getEncodingVersion())); + exception(id, + make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__, __LINE__, + "server doesn't support requested encoding", + _encoding, + proxy->ice_getEncodingVersion()))); return; } - _amdCB->ice_response(proxy->ice_identity(Ice::stringToIdentity("dummy"))); + _response(proxy->ice_identity(Ice::stringToIdentity("dummy"))); } - virtual void - exception(const std::string&, const Ice::Exception& ex) + void + exception(const std::string&, exception_ptr ex) override { if(_traceLevels->locator > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex); } - _amdCB->ice_response(0); + _response(nullptr); } private: - const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; - const LocatorIPtr _locator; + const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response; + const function<void(exception_ptr)> _exception; + const shared_ptr<LocatorI> _locator; const Ice::EncodingVersion _encoding; const LocatorAdapterInfo _adapter; - const TraceLevelsPtr _traceLevels; + const shared_ptr<TraceLevels> _traceLevels; }; -class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex +class ReplicaGroupRequest final : public LocatorI::Request { public: - ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, + ReplicaGroupRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)> exception, + const shared_ptr<LocatorI>& locator, const string& id, const Ice::EncodingVersion& encoding, const LocatorAdapterInfoSeq& adapters, int count, - Ice::ObjectPrx firstProxy) : - _amdCB(amdCB), + shared_ptr<Ice::ObjectPrx> firstProxy) : + _response(move(response)), + _exception(move(exception)), _locator(locator), _id(id), _encoding(encoding), @@ -254,15 +130,15 @@ public: } } - virtual void - execute() + void + execute() override { // // Otherwise, request as many adapters as required. // LocatorAdapterInfoSeq adapters; { - Lock sync(*this); + lock_guard lock(_mutex); for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i) { if(_lastAdapter == _adapters.end()) @@ -286,17 +162,17 @@ public: } } - for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + for(const auto& adapter : adapters) { - if(_locator->getDirectProxy(*p, this)) + if(_locator->getDirectProxy(adapter, shared_from_this())) { - activating(p->id); + activating(adapter.id); } } } - virtual void - activating(const string&) + void + activating(const string&) override { // // An adapter is being activated. Don't wait for the activation to complete. Instead, @@ -305,7 +181,7 @@ public: LocatorAdapterInfo adapter; do { - Lock sync(*this); + lock_guard lock(_mutex); if(_lastAdapter == _adapters.end()) { break; @@ -313,23 +189,23 @@ public: adapter = *_lastAdapter; ++_lastAdapter; } - while(_locator->getDirectProxy(adapter, this)); + while(_locator->getDirectProxy(adapter, shared_from_this())); } - virtual void - exception(const string& /*id*/, const Ice::Exception& ex) + void + exception(const string&, exception_ptr exptr) override { LocatorAdapterInfo adapter; { - Lock sync(*this); + lock_guard lock(_mutex); if(_proxies.size() == _count) // Nothing to do if we already sent the response. { return; } - if(!_exception.get()) + if(!_exptr) { - _exception.reset(ex.ice_clone()); + _exptr = exptr; } if(_lastAdapter == _adapters.end()) @@ -354,30 +230,31 @@ public: if(adapter.proxy) { - if(_locator->getDirectProxy(adapter, this)) + if(_locator->getDirectProxy(adapter, shared_from_this())) { activating(adapter.id); } } } - virtual void - response(const string& id, const Ice::ObjectPrx& proxy) + void + response(const string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override { // // Ensure the server supports the request encoding. // if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion())) { - exception(id, Ice::UnsupportedEncodingException(__FILE__, - __LINE__, - "server doesn't support requested encoding", - _encoding, - proxy->ice_getEncodingVersion())); + exception(id, + make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__, + __LINE__, + "server doesn't support requested encoding", + _encoding, + proxy->ice_getEncodingVersion()))); return; } - Lock sync(*this); + lock_guard lock(_mutex); assert(proxy); if(_proxies.size() == _count) // Nothing to do if we already sent the response. { @@ -403,7 +280,7 @@ private: { if(_proxies.size() == 1) { - _amdCB->ice_response(_proxies.begin()->second); + _response(_proxies.begin()->second); } else if(_proxies.empty()) { @@ -411,58 +288,63 @@ private: // If there's no proxies, it's either because we couldn't contact the adapters or // because the replica group has no members. // - assert(_exception.get() || _adapters.empty()); + assert(_exptr || _adapters.empty()); if(_traceLevels->locator > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; - out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + out << (_exptr ? toString(_exptr) : string("replica group is empty")); } - _amdCB->ice_response(0); + _response(nullptr); } else if(_proxies.size() > 1) { Ice::EndpointSeq endpoints; endpoints.reserve(_proxies.size()); - for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + for(const auto& adapter : _adapters) { - map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id); + auto q = _proxies.find(adapter.id); if(q != _proxies.end()) { - Ice::EndpointSeq edpts = q->second->ice_getEndpoints(); + auto edpts = q->second->ice_getEndpoints(); endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); + } } - Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); - _amdCB->ice_response(proxy->ice_endpoints(endpoints)); + auto proxy = _locator->getCommunicator()->stringToProxy("dummy:default"); + _response(proxy->ice_endpoints(endpoints)); } } - const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; - const LocatorIPtr _locator; + const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response; + const function<void(exception_ptr)> _exception; + const shared_ptr<LocatorI> _locator; const std::string _id; const Ice::EncodingVersion _encoding; LocatorAdapterInfoSeq _adapters; - const TraceLevelsPtr _traceLevels; + const shared_ptr<TraceLevels> _traceLevels; unsigned int _count; LocatorAdapterInfoSeq::const_iterator _lastAdapter; - std::map<std::string, Ice::ObjectPrx> _proxies; - IceInternal::UniquePtr<Ice::Exception> _exception; + std::map<std::string, shared_ptr<Ice::ObjectPrx>> _proxies; + exception_ptr _exptr; + std::mutex _mutex; }; -class RoundRobinRequest : public LocatorI::Request, SynchronizationCallback, public IceUtil::Mutex +class RoundRobinRequest final : public LocatorI::Request, SynchronizationCallback { public: - RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const LocatorIPtr& locator, - const DatabasePtr database, + RoundRobinRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)> exception, + const shared_ptr<LocatorI>& locator, + const shared_ptr<Database> database, const string& id, const Ice::Current& current, const LocatorAdapterInfoSeq& adapters, int count) : - _amdCB(amdCB), + _response(move(response)), + _exception(move(exception)), _locator(locator), _database(database), _id(id), @@ -477,8 +359,8 @@ public: assert(_adapters.empty() || _count > 0); } - virtual void - execute() + void + execute() override { if(_adapters.empty()) { @@ -487,26 +369,26 @@ public: Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty"; } - _amdCB->ice_response(0); + _response(nullptr); return; } LocatorAdapterInfo adapter = _adapters[0]; assert(adapter.proxy); - if(_locator->getDirectProxy(adapter, this)) + if(_locator->getDirectProxy(adapter, shared_from_this())) { activating(adapter.id); } } - virtual void - activating(const string& id) + void + activating(const string& id) override { LocatorAdapterInfo adapter; adapter.id = id; do { - Lock sync(*this); + lock_guard lock(_mutex); if(_adapters.empty() || _waitForActivation) { return; @@ -514,26 +396,27 @@ public: _activatingOrFailed.insert(adapter.id); adapter = nextAdapter(); } - while(adapter.proxy && _locator->getDirectProxy(adapter, this)); + while(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this())); } - virtual void - response(const std::string& id, const Ice::ObjectPrx& proxy) + void + response(const std::string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override { // // Ensure the server supports the request encoding. // if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion())) { - exception(id, Ice::UnsupportedEncodingException(__FILE__, - __LINE__, - "server doesn't support requested encoding", - _encoding, - proxy->ice_getEncodingVersion())); + exception(id, + make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__, + __LINE__, + "server doesn't support requested encoding", + _encoding, + proxy->ice_getEncodingVersion()))); return; } - Lock sync(*this); + lock_guard lock(_mutex); assert(proxy); if(_adapters.empty() || id != _adapters[0].id) { @@ -542,30 +425,30 @@ public: if(_count > 1) { - Ice::ObjectPrx p = proxy->ice_identity(Ice::stringToIdentity("dummy")); - LocatorI::RequestPtr request = - new ReplicaGroupRequest(_amdCB, _locator, _id, _encoding, _adapters, _count, p); + auto p = proxy->ice_identity(Ice::stringToIdentity("dummy")); + shared_ptr<LocatorI::Request> request = + make_shared<ReplicaGroupRequest>(_response, _exception, _locator, _id, _encoding, _adapters, _count, p); request->execute(); } else { - _amdCB->ice_response(proxy->ice_identity(Ice::stringToIdentity("dummy"))); + _response(proxy->ice_identity(Ice::stringToIdentity("dummy"))); } _adapters.clear(); } - virtual void - exception(const std::string& id, const Ice::Exception& ex) + void + exception(const std::string& id, exception_ptr ex) override { LocatorAdapterInfo adapter; { - Lock sync(*this); + lock_guard<std::mutex> lock(_mutex); _failed.insert(id); _activatingOrFailed.insert(id); - if(!_exception.get()) + if(!_exptr) { - _exception.reset(ex.ice_clone()); + _exptr = ex; } if(_adapters.empty() || id != _adapters[0].id) @@ -576,34 +459,34 @@ public: adapter = nextAdapter(); } - if(adapter.proxy && _locator->getDirectProxy(adapter, this)) + if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this())) { activating(adapter.id); } } void - synchronized() + synchronized() override { LocatorAdapterInfo adapter; { - Lock sync(*this); + lock_guard lock(_mutex); assert(_adapters.empty()); adapter = nextAdapter(); } - if(adapter.proxy && _locator->getDirectProxy(adapter, this)) + if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this())) { activating(adapter.id); } } void - synchronized(const Ice::Exception& ex) + synchronized(exception_ptr ex) override { LocatorAdapterInfo adapter; { - Lock sync(*this); + lock_guard lock(_mutex); assert(_adapters.empty()); if(_activatingOrFailed.size() > _failed.size()) @@ -618,12 +501,12 @@ public: Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex); } - _amdCB->ice_response(0); + _response(nullptr); return; } } - if(adapter.proxy && _locator->getDirectProxy(adapter, this)) + if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this())) { activating(adapter.id); } @@ -667,13 +550,15 @@ private: { assert(_adapters.empty()); bool callback; + auto self = dynamic_pointer_cast<SynchronizationCallback>(shared_from_this()); + assert(self); if(!_waitForActivation) { - callback = _database->addAdapterSyncCallback(_id, this, _activatingOrFailed); + callback = _database->addAdapterSyncCallback(_id, move(self), _activatingOrFailed); } else { - callback = _database->addAdapterSyncCallback(_id, this, _failed); + callback = _database->addAdapterSyncCallback(_id, move(self), _failed); } if(callback) { @@ -688,11 +573,11 @@ private: { Ice::Current current; current.encoding = _encoding; - _locator->findAdapterById_async(_amdCB, _id, current); + _locator->findAdapterByIdAsync( _id, _response, _exception, current); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - _amdCB->ice_exception(ex); + _exception(current_exception()); } _adapters.clear(); return LocatorAdapterInfo(); @@ -708,108 +593,117 @@ private: { Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); out << "couldn't resolve replica group `" << _id << "' endpoints:\n"; - out << (_exception.get() ? toString(*_exception) : string("replica group is empty")); + out << (_exptr ? toString(_exptr) : string("replica group is empty")); } - _amdCB->ice_response(0); + _response(nullptr); return LocatorAdapterInfo(); } } catch(const AdapterNotExistException&) { assert(_adapters.empty()); - _amdCB->ice_exception(Ice::AdapterNotFoundException()); + _exception(make_exception_ptr(Ice::AdapterNotFoundException())); return LocatorAdapterInfo(); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { assert(_adapters.empty()); if(_traceLevels->locator > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex); + out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(current_exception()); } - _amdCB->ice_response(0); + _response(nullptr); return LocatorAdapterInfo(); } } - const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; - const LocatorIPtr _locator; - const DatabasePtr _database; - const std::string _id; + const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response; + const function<void(exception_ptr)> _exception; + const shared_ptr<LocatorI> _locator; + const shared_ptr<Database> _database; + const string _id; const Ice::EncodingVersion _encoding; - const Ice::ConnectionPtr _connection; + const shared_ptr<Ice::Connection> _connection; const Ice::Context _context; LocatorAdapterInfoSeq _adapters; - const TraceLevelsPtr _traceLevels; + const shared_ptr<TraceLevels> _traceLevels; int _count; bool _waitForActivation; set<string> _failed; set<string> _activatingOrFailed; - IceInternal::UniquePtr<Ice::Exception> _exception; + exception_ptr _exptr; + mutex _mutex; }; -class FindAdapterByIdCallback : public SynchronizationCallback +class FindAdapterByIdCallback final : public SynchronizationCallback { public: - FindAdapterByIdCallback(const LocatorIPtr& locator, - const Ice::AMD_Locator_findAdapterByIdPtr& cb, + FindAdapterByIdCallback(const shared_ptr<LocatorI>& locator, + function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)> exception, const string& id, - const Ice::Current& current) : _locator(locator), _cb(cb), _id(id), _current(current) + const Ice::Current& current) : + _locator(locator), + _response(move(response)), + _exception(move(exception)), + _id(id), + _current(current) { } - virtual void - synchronized() + void + synchronized() override { try { - _locator->findAdapterById_async(_cb, _id, _current); + _locator->findAdapterByIdAsync(_id, _response, _exception, _current); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - _cb->ice_exception(ex); + _exception(current_exception()); } } - virtual void - synchronized(const Ice::Exception& sex) + void + synchronized(exception_ptr exptr) override { try { - sex.ice_throw(); + rethrow_exception(exptr); } catch(const AdapterNotExistException&) { } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - const TraceLevelsPtr traceLevels = _locator->getTraceLevels(); + const shared_ptr<TraceLevels> traceLevels = _locator->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); - out << "couldn't resolve adapter `" << _id << "' endpoints:\n" << toString(ex); + out << "couldn't resolve adapter `" << _id << "' endpoints:\n" << toString(exptr); } } - _cb->ice_response(0); + _response(nullptr); } private: - const LocatorIPtr _locator; - const Ice::AMD_Locator_findAdapterByIdPtr _cb; + const shared_ptr<LocatorI> _locator; + const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response; + const function<void(exception_ptr)> _exception; const string _id; const Ice::Current _current; }; }; -LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, - const DatabasePtr& database, - const WellKnownObjectsManagerPtr& wellKnownObjects, - const RegistryPrx& registry, - const QueryPrx& query) : +LocatorI::LocatorI(const shared_ptr<Ice::Communicator>& communicator, + const shared_ptr<Database>& database, + const shared_ptr<WellKnownObjectsManager>& wellKnownObjects, + const shared_ptr<RegistryPrx>& registry, + const shared_ptr<QueryPrx>& query) : _communicator(communicator), _database(database), _wellKnownObjects(wellKnownObjects), @@ -823,13 +717,14 @@ LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, // registry. // void -LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb, - const Ice::Identity& id, - const Ice::Current&) const +LocatorI::findObjectByIdAsync(Ice::Identity id, + function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)>, + const Ice::Current&) const { try { - cb->ice_response(_database->getObjectProxy(id)); + response(_database->getObjectProxy(id)); } catch(const ObjectNotRegisteredException&) { @@ -842,11 +737,13 @@ LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb, // registry. If found, we try to get its direct proxy. // void -LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, - const string& id, - const Ice::Current& current) const +LocatorI::findAdapterByIdAsync(string id, + function<void(const shared_ptr<Ice::ObjectPrx>&)> response, + function<void(exception_ptr)> exception, + const Ice::Current& current) const { - LocatorIPtr self = const_cast<LocatorI*>(this); + auto self = const_pointer_cast<LocatorI>(shared_from_this()); + bool replicaGroup = false; try { @@ -868,26 +765,28 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, } catch(const SynchronizationException&) { - if(_database->addAdapterSyncCallback(id, new FindAdapterByIdCallback(self, cb, id, current))) + if(_database->addAdapterSyncCallback(id, make_shared<FindAdapterByIdCallback>(self, response, exception, id, current))) { return; } } } - RequestPtr request; + shared_ptr<Request> request; if(roundRobin) { - request = new RoundRobinRequest(cb, self, _database, id, current, adapters, count); + request = make_shared<RoundRobinRequest>(response, exception, self, _database, id, current, adapters, + count); } else if(replicaGroup) { - request = new ReplicaGroupRequest(cb, self, id, current.encoding, adapters, count, 0); + request = make_shared<ReplicaGroupRequest>(response, exception, self, id, current.encoding, adapters, count, + nullptr); } else { assert(adapters.size() == 1); - request = new AdapterRequest(cb, self, current.encoding, adapters[0]); + request = make_shared<AdapterRequest>(response, exception, self, current.encoding, adapters[0]); } request->execute(); return; @@ -895,95 +794,100 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb, catch(const AdapterNotExistException&) { } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); if(replicaGroup) { - out << "couldn't resolve replica group `" << id << "' endpoints:\n" << toString(ex); + out << "couldn't resolve replica group `" << id << "' endpoints:\n" << toString(current_exception()); } else { - out << "couldn't resolve adapter `" << id << "' endpoints:\n" << toString(ex); + out << "couldn't resolve adapter `" << id << "' endpoints:\n" << toString(current_exception()); } } - cb->ice_response(0); + response(nullptr); return; } try { - cb->ice_response(_database->getAdapterDirectProxy(id, current.encoding, current.con, current.ctx)); + response(_database->getAdapterDirectProxy(id, current.encoding, current.con, current.ctx)); } catch(const AdapterNotExistException&) { - cb->ice_exception(Ice::AdapterNotFoundException()); + exception(make_exception_ptr(Ice::AdapterNotFoundException())); } } -Ice::LocatorRegistryPrx +shared_ptr<Ice::LocatorRegistryPrx> LocatorI::getRegistry(const Ice::Current&) const { return _wellKnownObjects->getLocatorRegistry(); } -RegistryPrx +shared_ptr<RegistryPrx> LocatorI::getLocalRegistry(const Ice::Current&) const { return _localRegistry; } -QueryPrx +shared_ptr<QueryPrx> LocatorI::getLocalQuery(const Ice::Current&) const { return _localQuery; } -const Ice::CommunicatorPtr& +const shared_ptr<Ice::Communicator>& LocatorI::getCommunicator() const { return _communicator; } -const TraceLevelsPtr& +const shared_ptr<TraceLevels>& LocatorI::getTraceLevels() const { return _database->getTraceLevels(); } bool -LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request) +LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const shared_ptr<Request>& request) { { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + lock_guard lock(_mutex); + auto p = _pendingRequests.find(adapter.id); if(p != _pendingRequests.end()) { p->second.push_back(request); return _activating.find(adapter.id) != _activating.end(); } - PendingRequests requests; - requests.push_back(request); - _pendingRequests.insert(make_pair(adapter.id, requests)); + _pendingRequests.insert({ adapter.id, { request } }); } - adapter.proxy->begin_getDirectProxy(newCallback_Adapter_getDirectProxy( - new AdapterGetDirectProxyCallback(this, adapter), - &AdapterGetDirectProxyCallback::response, - &AdapterGetDirectProxyCallback::exception)); + + auto self = shared_from_this(); + adapter.proxy->getDirectProxyAsync([self, adapter] (auto obj) + { + assert(obj); + self->getDirectProxyResponse(adapter, move(obj)); + }, + [self, adapter] (exception_ptr ex) + { + self->getDirectProxyException(adapter, ex); + }); return false; } void -LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy) +LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const shared_ptr<Ice::ObjectPrx>& proxy) { PendingRequests requests; { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + lock_guard lock(_mutex); + auto p = _pendingRequests.find(adapter.id); assert(p != _pendingRequests.end()); requests.swap(p->second); _pendingRequests.erase(p); @@ -992,27 +896,27 @@ LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::O if(proxy) { - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + for(const auto& request : requests) { - (*q)->response(adapter.id, proxy); + request->response(adapter.id, proxy); } } else { - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + for(const auto& request : requests) { - (*q)->exception(adapter.id, AdapterNotActiveException()); + request->exception(adapter.id, make_exception_ptr(AdapterNotActiveException())); } } } void -LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex) +LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, exception_ptr ex) { bool activate = false; try { - ex.ice_throw(); + rethrow_exception(ex); } catch(const AdapterNotActiveException& e) { @@ -1024,8 +928,8 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: PendingRequests requests; { - Lock sync(*this); - PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id); + lock_guard lock(_mutex); + auto p = _pendingRequests.find(adapter.id); assert(p != _pendingRequests.end()); if(activate) { @@ -1042,22 +946,28 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: if(activate) { - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + for(const auto& request : requests) { - (*q)->activating(adapter.id); + request->activating(adapter.id); } - int timeout = adapter.activationTimeout + adapter.deactivationTimeout; - AdapterPrx::uncheckedCast(adapter.proxy->ice_invocationTimeout(timeout * 1000))->begin_activate( - newCallback_Adapter_activate(new AdapterActivateCallback(this, adapter), - &AdapterActivateCallback::response, - &AdapterActivateCallback::exception)); + int timeout = secondsToInt(adapter.activationTimeout + adapter.deactivationTimeout) * 1000; + auto self = shared_from_this(); + Ice::uncheckedCast<AdapterPrx>(adapter.proxy->ice_invocationTimeout(timeout))->activateAsync( + [self, adapter] (auto obj) + { + self->getDirectProxyResponse(adapter, move(obj)); + }, + [self, adapter] (auto e) + { + self->getDirectProxyException(adapter, e); + }); } else { - for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q) + for(const auto& request : requests) { - (*q)->exception(adapter.id, ex); + request->exception(adapter.id, ex); } } } |