summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/LocatorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp574
1 files changed, 242 insertions, 332 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index ae21fa49175..e6da5a12384 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -8,153 +8,26 @@
#include <IceGrid/WellKnownObjectsManager.h>
#include <IceGrid/SessionI.h>
#include <IceGrid/Util.h>
+#include <IceGrid/Internal.h>
using namespace std;
+using namespace std::chrono;
using namespace IceGrid;
-namespace IceGrid
+namespace
{
-//
-// Callback from asynchronous call to adapter->getDirectProxy() invoked in LocatorI::findAdapterById_async().
-//
-class AdapterGetDirectProxyCallback : public virtual IceUtil::Shared
-{
-public:
-
- AdapterGetDirectProxyCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
- _locator(locator), _adapter(adapter)
- {
- }
-
- virtual void response(const ::Ice::ObjectPrx& obj)
- {
- assert(obj);
- _locator->getDirectProxyResponse(_adapter, obj);
- }
-
- virtual void exception(const ::Ice::Exception& e)
- {
- _locator->getDirectProxyException(_adapter, e);
- }
-
-private:
-
- const LocatorIPtr _locator;
- const LocatorAdapterInfo _adapter;
-};
-
-class AdapterActivateCallback : public virtual IceUtil::Shared
-{
-public:
-
- AdapterActivateCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
- _locator(locator), _adapter(adapter)
- {
- }
-
- virtual void response(const ::Ice::ObjectPrx& obj)
- {
- _locator->getDirectProxyResponse(_adapter, obj);
- }
-
- virtual void exception(const ::Ice::Exception& ex)
- {
- _locator->getDirectProxyException(_adapter, ex);
- }
-
-private:
-
- const LocatorIPtr _locator;
- const LocatorAdapterInfo _adapter;
-};
-
-//
-// Callback from asynchrnous call to LocatorI::findAdapterById_async()
-// invoked in LocatorI::findObjectById_async().
-//
-class AMD_Locator_findAdapterByIdI : public Ice::AMD_Locator_findAdapterById
+class AdapterRequest final : public LocatorI::Request
{
public:
- AMD_Locator_findAdapterByIdI(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::ObjectPrx& obj) :
- _cb(cb),
- _obj(obj)
- {
- }
-
- virtual void ice_response(const ::Ice::ObjectPrx& obj)
- {
- //
- // If the adapter dummy direct proxy is not null, return a
- // proxy containing the identity we were looking for and the
- // endpoints of the adapter.
- //
- // If null, return the proxy registered with the object
- // registry.
- //
- if(obj)
- {
- _cb->ice_response(obj->ice_identity(_obj->ice_getIdentity()));
- }
- else
- {
- _cb->ice_response(_obj);
- }
- }
-
- virtual void ice_exception(const ::Ice::Exception& ex)
- {
- try
- {
- ex.ice_throw();
- }
- catch(const Ice::AdapterNotFoundException&)
- {
- //
- // We couldn't find the adapter, we ignore and return the
- // original proxy containing the adapter id.
- //
- _cb->ice_response(_obj);
- return;
- }
- catch(const Ice::Exception& e)
- {
- //
- // Rethrow unexpected exception.
- //
- _cb->ice_exception(e);
- return;
- }
-
- assert(false);
- }
-
- virtual void ice_exception(const std::exception& ex)
- {
- _cb->ice_exception(ex);
- }
-
- virtual void ice_exception()
- {
- _cb->ice_exception();
- }
-
-private:
-
- const Ice::AMD_Locator_findObjectByIdPtr _cb;
- const Ice::ObjectPrx _obj;
-};
-
-class AdapterRequest : public LocatorI::Request
-{
-public:
-
- AdapterRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const LocatorIPtr& locator,
+ AdapterRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)> exception,
+ const shared_ptr<LocatorI>& locator,
const Ice::EncodingVersion& encoding,
const LocatorAdapterInfo& adapter) :
- _amdCB(amdCB),
+ _response(move(response)),
+ _exception(move(exception)),
_locator(locator),
_encoding(encoding),
_adapter(adapter),
@@ -163,20 +36,20 @@ public:
assert(_adapter.proxy);
}
- virtual void
- execute()
+ void
+ execute() override
{
- _locator->getDirectProxy(_adapter, this);
+ _locator->getDirectProxy(_adapter, shared_from_this());
}
- virtual void
- activating(const string&)
+ void
+ activating(const string&) override
{
// Nothing to do.
}
- virtual void
- response(const std::string& id, const Ice::ObjectPrx& proxy)
+ void
+ response(const std::string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override
{
assert(proxy);
@@ -185,49 +58,52 @@ public:
//
if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion()))
{
- exception(id, Ice::UnsupportedEncodingException(__FILE__,
- __LINE__,
- "server doesn't support requested encoding",
- _encoding,
- proxy->ice_getEncodingVersion()));
+ exception(id,
+ make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__, __LINE__,
+ "server doesn't support requested encoding",
+ _encoding,
+ proxy->ice_getEncodingVersion())));
return;
}
- _amdCB->ice_response(proxy->ice_identity(Ice::stringToIdentity("dummy")));
+ _response(proxy->ice_identity(Ice::stringToIdentity("dummy")));
}
- virtual void
- exception(const std::string&, const Ice::Exception& ex)
+ void
+ exception(const std::string&, exception_ptr ex) override
{
if(_traceLevels->locator > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
out << "couldn't resolve adapter`" << _adapter.id << "' endpoints:\n" << toString(ex);
}
- _amdCB->ice_response(0);
+ _response(nullptr);
}
private:
- const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
- const LocatorIPtr _locator;
+ const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response;
+ const function<void(exception_ptr)> _exception;
+ const shared_ptr<LocatorI> _locator;
const Ice::EncodingVersion _encoding;
const LocatorAdapterInfo _adapter;
- const TraceLevelsPtr _traceLevels;
+ const shared_ptr<TraceLevels> _traceLevels;
};
-class ReplicaGroupRequest : public LocatorI::Request, public IceUtil::Mutex
+class ReplicaGroupRequest final : public LocatorI::Request
{
public:
- ReplicaGroupRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const LocatorIPtr& locator,
+ ReplicaGroupRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)> exception,
+ const shared_ptr<LocatorI>& locator,
const string& id,
const Ice::EncodingVersion& encoding,
const LocatorAdapterInfoSeq& adapters,
int count,
- Ice::ObjectPrx firstProxy) :
- _amdCB(amdCB),
+ shared_ptr<Ice::ObjectPrx> firstProxy) :
+ _response(move(response)),
+ _exception(move(exception)),
_locator(locator),
_id(id),
_encoding(encoding),
@@ -254,15 +130,15 @@ public:
}
}
- virtual void
- execute()
+ void
+ execute() override
{
//
// Otherwise, request as many adapters as required.
//
LocatorAdapterInfoSeq adapters;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
for(unsigned int i = static_cast<unsigned int>(_proxies.size()); i < _count; ++i)
{
if(_lastAdapter == _adapters.end())
@@ -286,17 +162,17 @@ public:
}
}
- for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+ for(const auto& adapter : adapters)
{
- if(_locator->getDirectProxy(*p, this))
+ if(_locator->getDirectProxy(adapter, shared_from_this()))
{
- activating(p->id);
+ activating(adapter.id);
}
}
}
- virtual void
- activating(const string&)
+ void
+ activating(const string&) override
{
//
// An adapter is being activated. Don't wait for the activation to complete. Instead,
@@ -305,7 +181,7 @@ public:
LocatorAdapterInfo adapter;
do
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_lastAdapter == _adapters.end())
{
break;
@@ -313,23 +189,23 @@ public:
adapter = *_lastAdapter;
++_lastAdapter;
}
- while(_locator->getDirectProxy(adapter, this));
+ while(_locator->getDirectProxy(adapter, shared_from_this()));
}
- virtual void
- exception(const string& /*id*/, const Ice::Exception& ex)
+ void
+ exception(const string&, exception_ptr exptr) override
{
LocatorAdapterInfo adapter;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_proxies.size() == _count) // Nothing to do if we already sent the response.
{
return;
}
- if(!_exception.get())
+ if(!_exptr)
{
- _exception.reset(ex.ice_clone());
+ _exptr = exptr;
}
if(_lastAdapter == _adapters.end())
@@ -354,30 +230,31 @@ public:
if(adapter.proxy)
{
- if(_locator->getDirectProxy(adapter, this))
+ if(_locator->getDirectProxy(adapter, shared_from_this()))
{
activating(adapter.id);
}
}
}
- virtual void
- response(const string& id, const Ice::ObjectPrx& proxy)
+ void
+ response(const string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override
{
//
// Ensure the server supports the request encoding.
//
if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion()))
{
- exception(id, Ice::UnsupportedEncodingException(__FILE__,
- __LINE__,
- "server doesn't support requested encoding",
- _encoding,
- proxy->ice_getEncodingVersion()));
+ exception(id,
+ make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__,
+ __LINE__,
+ "server doesn't support requested encoding",
+ _encoding,
+ proxy->ice_getEncodingVersion())));
return;
}
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(proxy);
if(_proxies.size() == _count) // Nothing to do if we already sent the response.
{
@@ -403,7 +280,7 @@ private:
{
if(_proxies.size() == 1)
{
- _amdCB->ice_response(_proxies.begin()->second);
+ _response(_proxies.begin()->second);
}
else if(_proxies.empty())
{
@@ -411,58 +288,63 @@ private:
// If there's no proxies, it's either because we couldn't contact the adapters or
// because the replica group has no members.
//
- assert(_exception.get() || _adapters.empty());
+ assert(_exptr || _adapters.empty());
if(_traceLevels->locator > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
- out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ out << (_exptr ? toString(_exptr) : string("replica group is empty"));
}
- _amdCB->ice_response(0);
+ _response(nullptr);
}
else if(_proxies.size() > 1)
{
Ice::EndpointSeq endpoints;
endpoints.reserve(_proxies.size());
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ for(const auto& adapter : _adapters)
{
- map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
+ auto q = _proxies.find(adapter.id);
if(q != _proxies.end())
{
- Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
+ auto edpts = q->second->ice_getEndpoints();
endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+
}
}
- Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
- _amdCB->ice_response(proxy->ice_endpoints(endpoints));
+ auto proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
+ _response(proxy->ice_endpoints(endpoints));
}
}
- const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
- const LocatorIPtr _locator;
+ const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response;
+ const function<void(exception_ptr)> _exception;
+ const shared_ptr<LocatorI> _locator;
const std::string _id;
const Ice::EncodingVersion _encoding;
LocatorAdapterInfoSeq _adapters;
- const TraceLevelsPtr _traceLevels;
+ const shared_ptr<TraceLevels> _traceLevels;
unsigned int _count;
LocatorAdapterInfoSeq::const_iterator _lastAdapter;
- std::map<std::string, Ice::ObjectPrx> _proxies;
- IceInternal::UniquePtr<Ice::Exception> _exception;
+ std::map<std::string, shared_ptr<Ice::ObjectPrx>> _proxies;
+ exception_ptr _exptr;
+ std::mutex _mutex;
};
-class RoundRobinRequest : public LocatorI::Request, SynchronizationCallback, public IceUtil::Mutex
+class RoundRobinRequest final : public LocatorI::Request, SynchronizationCallback
{
public:
- RoundRobinRequest(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const LocatorIPtr& locator,
- const DatabasePtr database,
+ RoundRobinRequest(function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)> exception,
+ const shared_ptr<LocatorI>& locator,
+ const shared_ptr<Database> database,
const string& id,
const Ice::Current& current,
const LocatorAdapterInfoSeq& adapters,
int count) :
- _amdCB(amdCB),
+ _response(move(response)),
+ _exception(move(exception)),
_locator(locator),
_database(database),
_id(id),
@@ -477,8 +359,8 @@ public:
assert(_adapters.empty() || _count > 0);
}
- virtual void
- execute()
+ void
+ execute() override
{
if(_adapters.empty())
{
@@ -487,26 +369,26 @@ public:
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
out << "couldn't resolve replica group `" << _id << "' endpoints:\nreplica group is empty";
}
- _amdCB->ice_response(0);
+ _response(nullptr);
return;
}
LocatorAdapterInfo adapter = _adapters[0];
assert(adapter.proxy);
- if(_locator->getDirectProxy(adapter, this))
+ if(_locator->getDirectProxy(adapter, shared_from_this()))
{
activating(adapter.id);
}
}
- virtual void
- activating(const string& id)
+ void
+ activating(const string& id) override
{
LocatorAdapterInfo adapter;
adapter.id = id;
do
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_adapters.empty() || _waitForActivation)
{
return;
@@ -514,26 +396,27 @@ public:
_activatingOrFailed.insert(adapter.id);
adapter = nextAdapter();
}
- while(adapter.proxy && _locator->getDirectProxy(adapter, this));
+ while(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this()));
}
- virtual void
- response(const std::string& id, const Ice::ObjectPrx& proxy)
+ void
+ response(const std::string& id, const shared_ptr<Ice::ObjectPrx>& proxy) override
{
//
// Ensure the server supports the request encoding.
//
if(!IceInternal::isSupported(_encoding, proxy->ice_getEncodingVersion()))
{
- exception(id, Ice::UnsupportedEncodingException(__FILE__,
- __LINE__,
- "server doesn't support requested encoding",
- _encoding,
- proxy->ice_getEncodingVersion()));
+ exception(id,
+ make_exception_ptr(Ice::UnsupportedEncodingException(__FILE__,
+ __LINE__,
+ "server doesn't support requested encoding",
+ _encoding,
+ proxy->ice_getEncodingVersion())));
return;
}
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(proxy);
if(_adapters.empty() || id != _adapters[0].id)
{
@@ -542,30 +425,30 @@ public:
if(_count > 1)
{
- Ice::ObjectPrx p = proxy->ice_identity(Ice::stringToIdentity("dummy"));
- LocatorI::RequestPtr request =
- new ReplicaGroupRequest(_amdCB, _locator, _id, _encoding, _adapters, _count, p);
+ auto p = proxy->ice_identity(Ice::stringToIdentity("dummy"));
+ shared_ptr<LocatorI::Request> request =
+ make_shared<ReplicaGroupRequest>(_response, _exception, _locator, _id, _encoding, _adapters, _count, p);
request->execute();
}
else
{
- _amdCB->ice_response(proxy->ice_identity(Ice::stringToIdentity("dummy")));
+ _response(proxy->ice_identity(Ice::stringToIdentity("dummy")));
}
_adapters.clear();
}
- virtual void
- exception(const std::string& id, const Ice::Exception& ex)
+ void
+ exception(const std::string& id, exception_ptr ex) override
{
LocatorAdapterInfo adapter;
{
- Lock sync(*this);
+ lock_guard<std::mutex> lock(_mutex);
_failed.insert(id);
_activatingOrFailed.insert(id);
- if(!_exception.get())
+ if(!_exptr)
{
- _exception.reset(ex.ice_clone());
+ _exptr = ex;
}
if(_adapters.empty() || id != _adapters[0].id)
@@ -576,34 +459,34 @@ public:
adapter = nextAdapter();
}
- if(adapter.proxy && _locator->getDirectProxy(adapter, this))
+ if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this()))
{
activating(adapter.id);
}
}
void
- synchronized()
+ synchronized() override
{
LocatorAdapterInfo adapter;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(_adapters.empty());
adapter = nextAdapter();
}
- if(adapter.proxy && _locator->getDirectProxy(adapter, this))
+ if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this()))
{
activating(adapter.id);
}
}
void
- synchronized(const Ice::Exception& ex)
+ synchronized(exception_ptr ex) override
{
LocatorAdapterInfo adapter;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
assert(_adapters.empty());
if(_activatingOrFailed.size() > _failed.size())
@@ -618,12 +501,12 @@ public:
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex);
}
- _amdCB->ice_response(0);
+ _response(nullptr);
return;
}
}
- if(adapter.proxy && _locator->getDirectProxy(adapter, this))
+ if(adapter.proxy && _locator->getDirectProxy(adapter, shared_from_this()))
{
activating(adapter.id);
}
@@ -667,13 +550,15 @@ private:
{
assert(_adapters.empty());
bool callback;
+ auto self = dynamic_pointer_cast<SynchronizationCallback>(shared_from_this());
+ assert(self);
if(!_waitForActivation)
{
- callback = _database->addAdapterSyncCallback(_id, this, _activatingOrFailed);
+ callback = _database->addAdapterSyncCallback(_id, move(self), _activatingOrFailed);
}
else
{
- callback = _database->addAdapterSyncCallback(_id, this, _failed);
+ callback = _database->addAdapterSyncCallback(_id, move(self), _failed);
}
if(callback)
{
@@ -688,11 +573,11 @@ private:
{
Ice::Current current;
current.encoding = _encoding;
- _locator->findAdapterById_async(_amdCB, _id, current);
+ _locator->findAdapterByIdAsync( _id, _response, _exception, current);
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::Exception&)
{
- _amdCB->ice_exception(ex);
+ _exception(current_exception());
}
_adapters.clear();
return LocatorAdapterInfo();
@@ -708,108 +593,117 @@ private:
{
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
out << "couldn't resolve replica group `" << _id << "' endpoints:\n";
- out << (_exception.get() ? toString(*_exception) : string("replica group is empty"));
+ out << (_exptr ? toString(_exptr) : string("replica group is empty"));
}
- _amdCB->ice_response(0);
+ _response(nullptr);
return LocatorAdapterInfo();
}
}
catch(const AdapterNotExistException&)
{
assert(_adapters.empty());
- _amdCB->ice_exception(Ice::AdapterNotFoundException());
+ _exception(make_exception_ptr(Ice::AdapterNotFoundException()));
return LocatorAdapterInfo();
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::Exception&)
{
assert(_adapters.empty());
if(_traceLevels->locator > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat);
- out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(ex);
+ out << "couldn't resolve replica group `" << _id << "' endpoints:\n" << toString(current_exception());
}
- _amdCB->ice_response(0);
+ _response(nullptr);
return LocatorAdapterInfo();
}
}
- const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
- const LocatorIPtr _locator;
- const DatabasePtr _database;
- const std::string _id;
+ const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response;
+ const function<void(exception_ptr)> _exception;
+ const shared_ptr<LocatorI> _locator;
+ const shared_ptr<Database> _database;
+ const string _id;
const Ice::EncodingVersion _encoding;
- const Ice::ConnectionPtr _connection;
+ const shared_ptr<Ice::Connection> _connection;
const Ice::Context _context;
LocatorAdapterInfoSeq _adapters;
- const TraceLevelsPtr _traceLevels;
+ const shared_ptr<TraceLevels> _traceLevels;
int _count;
bool _waitForActivation;
set<string> _failed;
set<string> _activatingOrFailed;
- IceInternal::UniquePtr<Ice::Exception> _exception;
+ exception_ptr _exptr;
+ mutex _mutex;
};
-class FindAdapterByIdCallback : public SynchronizationCallback
+class FindAdapterByIdCallback final : public SynchronizationCallback
{
public:
- FindAdapterByIdCallback(const LocatorIPtr& locator,
- const Ice::AMD_Locator_findAdapterByIdPtr& cb,
+ FindAdapterByIdCallback(const shared_ptr<LocatorI>& locator,
+ function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)> exception,
const string& id,
- const Ice::Current& current) : _locator(locator), _cb(cb), _id(id), _current(current)
+ const Ice::Current& current) :
+ _locator(locator),
+ _response(move(response)),
+ _exception(move(exception)),
+ _id(id),
+ _current(current)
{
}
- virtual void
- synchronized()
+ void
+ synchronized() override
{
try
{
- _locator->findAdapterById_async(_cb, _id, _current);
+ _locator->findAdapterByIdAsync(_id, _response, _exception, _current);
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::Exception&)
{
- _cb->ice_exception(ex);
+ _exception(current_exception());
}
}
- virtual void
- synchronized(const Ice::Exception& sex)
+ void
+ synchronized(exception_ptr exptr) override
{
try
{
- sex.ice_throw();
+ rethrow_exception(exptr);
}
catch(const AdapterNotExistException&)
{
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::Exception&)
{
- const TraceLevelsPtr traceLevels = _locator->getTraceLevels();
+ const shared_ptr<TraceLevels> traceLevels = _locator->getTraceLevels();
if(traceLevels->locator > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->locatorCat);
- out << "couldn't resolve adapter `" << _id << "' endpoints:\n" << toString(ex);
+ out << "couldn't resolve adapter `" << _id << "' endpoints:\n" << toString(exptr);
}
}
- _cb->ice_response(0);
+ _response(nullptr);
}
private:
- const LocatorIPtr _locator;
- const Ice::AMD_Locator_findAdapterByIdPtr _cb;
+ const shared_ptr<LocatorI> _locator;
+ const function<void(const shared_ptr<Ice::ObjectPrx>&)> _response;
+ const function<void(exception_ptr)> _exception;
const string _id;
const Ice::Current _current;
};
};
-LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
- const DatabasePtr& database,
- const WellKnownObjectsManagerPtr& wellKnownObjects,
- const RegistryPrx& registry,
- const QueryPrx& query) :
+LocatorI::LocatorI(const shared_ptr<Ice::Communicator>& communicator,
+ const shared_ptr<Database>& database,
+ const shared_ptr<WellKnownObjectsManager>& wellKnownObjects,
+ const shared_ptr<RegistryPrx>& registry,
+ const shared_ptr<QueryPrx>& query) :
_communicator(communicator),
_database(database),
_wellKnownObjects(wellKnownObjects),
@@ -823,13 +717,14 @@ LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
// registry.
//
void
-LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb,
- const Ice::Identity& id,
- const Ice::Current&) const
+LocatorI::findObjectByIdAsync(Ice::Identity id,
+ function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)>,
+ const Ice::Current&) const
{
try
{
- cb->ice_response(_database->getObjectProxy(id));
+ response(_database->getObjectProxy(id));
}
catch(const ObjectNotRegisteredException&)
{
@@ -842,11 +737,13 @@ LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb,
// registry. If found, we try to get its direct proxy.
//
void
-LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
- const string& id,
- const Ice::Current& current) const
+LocatorI::findAdapterByIdAsync(string id,
+ function<void(const shared_ptr<Ice::ObjectPrx>&)> response,
+ function<void(exception_ptr)> exception,
+ const Ice::Current& current) const
{
- LocatorIPtr self = const_cast<LocatorI*>(this);
+ auto self = const_pointer_cast<LocatorI>(shared_from_this());
+
bool replicaGroup = false;
try
{
@@ -868,26 +765,28 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
}
catch(const SynchronizationException&)
{
- if(_database->addAdapterSyncCallback(id, new FindAdapterByIdCallback(self, cb, id, current)))
+ if(_database->addAdapterSyncCallback(id, make_shared<FindAdapterByIdCallback>(self, response, exception, id, current)))
{
return;
}
}
}
- RequestPtr request;
+ shared_ptr<Request> request;
if(roundRobin)
{
- request = new RoundRobinRequest(cb, self, _database, id, current, adapters, count);
+ request = make_shared<RoundRobinRequest>(response, exception, self, _database, id, current, adapters,
+ count);
}
else if(replicaGroup)
{
- request = new ReplicaGroupRequest(cb, self, id, current.encoding, adapters, count, 0);
+ request = make_shared<ReplicaGroupRequest>(response, exception, self, id, current.encoding, adapters, count,
+ nullptr);
}
else
{
assert(adapters.size() == 1);
- request = new AdapterRequest(cb, self, current.encoding, adapters[0]);
+ request = make_shared<AdapterRequest>(response, exception, self, current.encoding, adapters[0]);
}
request->execute();
return;
@@ -895,95 +794,100 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
catch(const AdapterNotExistException&)
{
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::Exception&)
{
- const TraceLevelsPtr traceLevels = _database->getTraceLevels();
+ auto traceLevels = _database->getTraceLevels();
if(traceLevels->locator > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->locatorCat);
if(replicaGroup)
{
- out << "couldn't resolve replica group `" << id << "' endpoints:\n" << toString(ex);
+ out << "couldn't resolve replica group `" << id << "' endpoints:\n" << toString(current_exception());
}
else
{
- out << "couldn't resolve adapter `" << id << "' endpoints:\n" << toString(ex);
+ out << "couldn't resolve adapter `" << id << "' endpoints:\n" << toString(current_exception());
}
}
- cb->ice_response(0);
+ response(nullptr);
return;
}
try
{
- cb->ice_response(_database->getAdapterDirectProxy(id, current.encoding, current.con, current.ctx));
+ response(_database->getAdapterDirectProxy(id, current.encoding, current.con, current.ctx));
}
catch(const AdapterNotExistException&)
{
- cb->ice_exception(Ice::AdapterNotFoundException());
+ exception(make_exception_ptr(Ice::AdapterNotFoundException()));
}
}
-Ice::LocatorRegistryPrx
+shared_ptr<Ice::LocatorRegistryPrx>
LocatorI::getRegistry(const Ice::Current&) const
{
return _wellKnownObjects->getLocatorRegistry();
}
-RegistryPrx
+shared_ptr<RegistryPrx>
LocatorI::getLocalRegistry(const Ice::Current&) const
{
return _localRegistry;
}
-QueryPrx
+shared_ptr<QueryPrx>
LocatorI::getLocalQuery(const Ice::Current&) const
{
return _localQuery;
}
-const Ice::CommunicatorPtr&
+const shared_ptr<Ice::Communicator>&
LocatorI::getCommunicator() const
{
return _communicator;
}
-const TraceLevelsPtr&
+const shared_ptr<TraceLevels>&
LocatorI::getTraceLevels() const
{
return _database->getTraceLevels();
}
bool
-LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request)
+LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const shared_ptr<Request>& request)
{
{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ lock_guard lock(_mutex);
+ auto p = _pendingRequests.find(adapter.id);
if(p != _pendingRequests.end())
{
p->second.push_back(request);
return _activating.find(adapter.id) != _activating.end();
}
- PendingRequests requests;
- requests.push_back(request);
- _pendingRequests.insert(make_pair(adapter.id, requests));
+ _pendingRequests.insert({ adapter.id, { request } });
}
- adapter.proxy->begin_getDirectProxy(newCallback_Adapter_getDirectProxy(
- new AdapterGetDirectProxyCallback(this, adapter),
- &AdapterGetDirectProxyCallback::response,
- &AdapterGetDirectProxyCallback::exception));
+
+ auto self = shared_from_this();
+ adapter.proxy->getDirectProxyAsync([self, adapter] (auto obj)
+ {
+ assert(obj);
+ self->getDirectProxyResponse(adapter, move(obj));
+ },
+ [self, adapter] (exception_ptr ex)
+ {
+ self->getDirectProxyException(adapter, ex);
+ });
return false;
}
void
-LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy)
+LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const shared_ptr<Ice::ObjectPrx>& proxy)
{
PendingRequests requests;
{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ lock_guard lock(_mutex);
+ auto p = _pendingRequests.find(adapter.id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
@@ -992,27 +896,27 @@ LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::O
if(proxy)
{
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ for(const auto& request : requests)
{
- (*q)->response(adapter.id, proxy);
+ request->response(adapter.id, proxy);
}
}
else
{
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ for(const auto& request : requests)
{
- (*q)->exception(adapter.id, AdapterNotActiveException());
+ request->exception(adapter.id, make_exception_ptr(AdapterNotActiveException()));
}
}
}
void
-LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex)
+LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, exception_ptr ex)
{
bool activate = false;
try
{
- ex.ice_throw();
+ rethrow_exception(ex);
}
catch(const AdapterNotActiveException& e)
{
@@ -1024,8 +928,8 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::
PendingRequests requests;
{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ lock_guard lock(_mutex);
+ auto p = _pendingRequests.find(adapter.id);
assert(p != _pendingRequests.end());
if(activate)
{
@@ -1042,22 +946,28 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::
if(activate)
{
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ for(const auto& request : requests)
{
- (*q)->activating(adapter.id);
+ request->activating(adapter.id);
}
- int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
- AdapterPrx::uncheckedCast(adapter.proxy->ice_invocationTimeout(timeout * 1000))->begin_activate(
- newCallback_Adapter_activate(new AdapterActivateCallback(this, adapter),
- &AdapterActivateCallback::response,
- &AdapterActivateCallback::exception));
+ int timeout = secondsToInt(adapter.activationTimeout + adapter.deactivationTimeout) * 1000;
+ auto self = shared_from_this();
+ Ice::uncheckedCast<AdapterPrx>(adapter.proxy->ice_invocationTimeout(timeout))->activateAsync(
+ [self, adapter] (auto obj)
+ {
+ self->getDirectProxyResponse(adapter, move(obj));
+ },
+ [self, adapter] (auto e)
+ {
+ self->getDirectProxyException(adapter, e);
+ });
}
else
{
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ for(const auto& request : requests)
{
- (*q)->exception(adapter.id, ex);
+ request->exception(adapter.id, ex);
}
}
}