summaryrefslogtreecommitdiff
path: root/cpp/src/IceLocatorDiscovery/PluginI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceLocatorDiscovery/PluginI.cpp')
-rw-r--r--cpp/src/IceLocatorDiscovery/PluginI.cpp271
1 files changed, 236 insertions, 35 deletions
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp
index bf30aa4cf8e..5207a096c9c 100644
--- a/cpp/src/IceLocatorDiscovery/PluginI.cpp
+++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp
@@ -54,10 +54,26 @@ namespace
class LocatorI; // Forward declaration
-class Request : public IceUtil::Shared
+class Request : public Ice::EnableSharedFromThis<Request>
{
public:
+#ifdef ICE_CPP11_MAPPING
+ Request(LocatorI* locator,
+ const string& operation,
+ Ice::OperationMode mode,
+ const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ const Ice::Context& ctx,
+ function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> responseCB,
+ function<void (exception_ptr)> exceptionCB) :
+ _locator(locator),
+ _operation(operation),
+ _mode(mode),
+ _context(ctx),
+ _inParams(inParams.first, inParams.second),
+ _responseCB(move(responseCB)),
+ _exceptionCB(move(exceptionCB))
+#else
Request(LocatorI* locator,
const string& operation,
Ice::OperationMode mode,
@@ -70,10 +86,11 @@ public:
_context(ctx),
_inParams(inParams.first, inParams.second),
_amdCB(amdCB)
+#endif
{
}
- void invoke(const Ice::LocatorPrx&);
+ void invoke(const Ice::LocatorPrxPtr&);
void response(const bool, const pair<const Ice::Byte*, const Ice::Byte*>&);
void exception(const Ice::Exception&);
@@ -84,31 +101,47 @@ protected:
const Ice::OperationMode _mode;
const Ice::Context _context;
const Ice::ByteSeq _inParams;
+#ifdef ICE_CPP11_MAPPING
+ function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> _responseCB;
+ function<void (exception_ptr)> _exceptionCB;
+ exception_ptr _exception;
+#else
const Ice::AMD_Object_ice_invokePtr _amdCB;
-
- Ice::LocatorPrx _locatorPrx;
IceUtil::UniquePtr<Ice::Exception> _exception;
+#endif
+
+ Ice::LocatorPrxPtr _locatorPrx;
};
-typedef IceUtil::Handle<Request> RequestPtr;
+ICE_DEFINE_PTR(RequestPtr, Request);
-class LocatorI : public Ice::BlobjectArrayAsync, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex>
+class LocatorI : public Ice::BlobjectArrayAsync,
+ public IceUtil::TimerTask,
+ private IceUtil::Monitor<IceUtil::Mutex>,
+ public Ice::EnableSharedFromThis<LocatorI>
{
public:
- LocatorI(const LookupPrx&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrx&);
- void setLookupReply(const LookupReplyPrx&);
+ LocatorI(const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&);
+ void setLookupReply(const LookupReplyPrxPtr&);
+#ifdef ICE_CPP11_MAPPING
+ virtual void ice_invoke_async(pair<const Ice::Byte*, const Ice::Byte*>,
+ function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)>,
+ function<void (exception_ptr)>,
+ const Ice::Current&);
+#else
virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const pair<const Ice::Byte*, const Ice::Byte*>&,
const Ice::Current&);
+#endif
- void foundLocator(const Ice::LocatorPrx&);
- void invoke(const Ice::LocatorPrx&, const RequestPtr&);
+ void foundLocator(const Ice::LocatorPrxPtr&);
+ void invoke(const Ice::LocatorPrxPtr&, const RequestPtr&);
private:
virtual void runTimerTask();
- const LookupPrx _lookup;
+ const LookupPrxPtr _lookup;
const IceUtil::Time _timeout;
const int _retryCount;
const IceUtil::Time _retryDelay;
@@ -116,15 +149,15 @@ private:
string _instanceName;
bool _warned;
- LookupReplyPrx _lookupReply;
- Ice::LocatorPrx _locator;
- Ice::LocatorPrx _voidLocator;
+ LookupReplyPrxPtr _lookupReply;
+ Ice::LocatorPrxPtr _locator;
+ Ice::LocatorPrxPtr _voidLocator;
IceUtil::Time _nextRetry;
int _pendingRetryCount;
vector<RequestPtr> _pendingRequests;
};
-typedef IceUtil::Handle<LocatorI> LocatorIPtr;
+ICE_DEFINE_PTR(LocatorIPtr, LocatorI);
class LookupReplyI : public LookupReply
{
@@ -134,7 +167,7 @@ public:
{
}
- virtual void foundLocator(const Ice::LocatorPrx&, const Ice::Current&);
+ virtual void foundLocator(ICE_IN(Ice::LocatorPrxPtr), const Ice::Current&);
private:
@@ -155,6 +188,25 @@ class VoidLocatorI : public Ice::Locator
{
public:
+#ifdef ICE_CPP11_MAPPING
+ virtual void
+ findObjectById_async(::Ice::Identity,
+ function<void (const shared_ptr<::Ice::ObjectPrx>&)> response,
+ function<void (exception_ptr)>,
+ const Ice::Current&) const
+ {
+ response(nullptr);
+ }
+
+ virtual void
+ findAdapterById_async(string,
+ function<void (const shared_ptr<::Ice::ObjectPrx>&)> response,
+ function<void (exception_ptr)>,
+ const Ice::Current&) const
+ {
+ response(nullptr);
+ }
+#else
virtual void
findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB,
const Ice::Identity&,
@@ -170,11 +222,12 @@ public:
{
amdCB->ice_response(0);
}
+#endif
- virtual Ice::LocatorRegistryPrx
+ virtual Ice::LocatorRegistryPrxPtr
getRegistry(const Ice::Current&) const
{
- return 0;
+ return ICE_NULLPTR;
}
};
@@ -237,17 +290,40 @@ PluginI::initialize()
lookupEndpoints = os.str();
}
- Ice::ObjectPrx lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints);
+ Ice::ObjectPrxPtr lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints);
lookupPrx = lookupPrx->ice_collocationOptimized(false); // No collocation optimization for the multicast proxy!
try
{
// Ensure we can establish a connection to the multicast proxy
// but don't block.
+#ifdef ICE_CPP11_MAPPING
+ promise<bool> sent;
+ promise<void> completed;
+
+ lookupPrx->ice_getConnection_async(
+ [&](shared_ptr<Ice::Connection>)
+ {
+ completed.set_value();
+ },
+ [&](exception_ptr ex)
+ {
+ completed.set_exception(ex);
+ },
+ [&](bool sentSynchronously)
+ {
+ sent.set_value(sentSynchronously);
+ });
+ if(sent.get_future().get())
+ {
+ completed.get_future().get();
+ }
+#else
Ice::AsyncResultPtr result = lookupPrx->begin_ice_getConnection();
if(result->sentSynchronously())
{
lookupPrx->end_ice_getConnection(result);
}
+#endif
}
catch(const Ice::LocalException& ex)
{
@@ -258,17 +334,17 @@ PluginI::initialize()
throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
}
- Ice::LocatorPrx voidLocator = Ice::LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(new VoidLocatorI()));
+ Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI)));
string instanceName = properties->getProperty("IceLocatorDiscovery.InstanceName");
Ice::Identity id;
id.name = "Locator";
id.category = !instanceName.empty() ? instanceName : IceUtil::generateUUID();
- LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties, instanceName, voidLocator);
- _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->add(locator, id)));
+ LocatorIPtr locator = ICE_MAKE_SHARED(LocatorI, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName, voidLocator);
+ _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(locator, id)));
- Ice::ObjectPrx lookupReply = _replyAdapter->addWithUUID(new LookupReplyI(locator))->ice_datagram();
- locator->setLookupReply(LookupReplyPrx::uncheckedCast(lookupReply));
+ Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, locator))->ice_datagram();
+ locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply));
_replyAdapter->activate();
_locatorAdapter->activate();
@@ -282,8 +358,55 @@ PluginI::destroy()
}
void
-Request::invoke(const Ice::LocatorPrx& l)
+Request::invoke(const Ice::LocatorPrxPtr& l)
{
+#ifdef ICE_CPP11_MAPPING
+ if(l != _locatorPrx)
+ {
+ _locatorPrx = l;
+ try
+ {
+ l->ice_invoke_async(_operation, _mode, _inParams,
+ [self = shared_from_this()](bool ok, vector<Ice::Byte> outParams)
+ {
+ pair<const Ice::Byte*, const Ice::Byte*> outPair;
+ if(outParams.empty())
+ {
+ outPair.first = outPair.second = 0;
+ }
+ else
+ {
+ outPair.first = &outParams[0];
+ outPair.second = outPair.first + outParams.size();
+ }
+ self->response(ok, outPair);
+ },
+ [self = shared_from_this()](exception_ptr e)
+ {
+ try
+ {
+ rethrow_exception(e);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ self->exception(ex);
+ }
+ },
+ nullptr,
+ _context);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
+ }
+ }
+ else
+ {
+ assert(_exception); // Don't retry if the proxy didn't change
+ _exceptionCB(_exception);
+ }
+
+#else
if(l != _locatorPrx)
{
_locatorPrx = l;
@@ -302,17 +425,74 @@ Request::invoke(const Ice::LocatorPrx& l)
assert(_exception.get()); // Don't retry if the proxy didn't change
_amdCB->ice_exception(*_exception.get());
}
+#endif
}
void
Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams)
{
+#ifdef ICE_CPP11_MAPPING
+ _responseCB(ok, outParams);
+#else
_amdCB->ice_response(ok, outParams);
+#endif
}
void
Request::exception(const Ice::Exception& ex)
{
+#ifdef ICE_CPP11_MAPPING
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::RequestFailedException&)
+ {
+ _exceptionCB(current_exception());
+ }
+ catch(const Ice::UnknownException&)
+ {
+ _exceptionCB(current_exception());
+ }
+ catch(const Ice::NoEndpointException&)
+ {
+ try
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ catch(...)
+ {
+ _exceptionCB(current_exception());
+ }
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ try
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ catch(...)
+ {
+ _exceptionCB(current_exception());
+ }
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ try
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ catch(...)
+ {
+ _exceptionCB(current_exception());
+ }
+ }
+ catch(const Ice::Exception&)
+ {
+ _exception = current_exception();
+ _locator->invoke(_locatorPrx, shared_from_this()); // Retry with new locator proxy
+ }
+#else
try
{
ex.ice_throw();
@@ -340,14 +520,15 @@ Request::exception(const Ice::Exception& ex)
catch(const Ice::Exception&)
{
_exception.reset(ex.ice_clone());
- _locator->invoke(_locatorPrx, this); // Retry with new locator proxy
+ _locator->invoke(_locatorPrx, shared_from_this()); // Retry with new locator proxy
}
+#endif
}
-LocatorI::LocatorI(const LookupPrx& lookup,
+LocatorI::LocatorI(const LookupPrxPtr& lookup,
const Ice::PropertiesPtr& p,
const string& instanceName,
- const Ice::LocatorPrx& voidLocator) :
+ const Ice::LocatorPrxPtr& voidLocator) :
_lookup(lookup),
_timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.Timeout", 300))),
_retryCount(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryCount", 3)),
@@ -362,11 +543,22 @@ LocatorI::LocatorI(const LookupPrx& lookup,
}
void
-LocatorI::setLookupReply(const LookupReplyPrx& lookupReply)
+LocatorI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
{
_lookupReply = lookupReply;
}
+#ifdef ICE_CPP11_MAPPING
+void
+LocatorI::ice_invoke_async(pair<const Ice::Byte*, const Ice::Byte*> inParams,
+ function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> responseCB,
+ function<void (exception_ptr)> exceptionCB,
+ const Ice::Current& current)
+{
+ invoke(nullptr, make_shared<Request>(this, current.operation, current.mode, inParams, current.ctx,
+ move(responseCB), move(exceptionCB)));
+}
+#else
void
LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB,
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
@@ -374,9 +566,10 @@ LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB,
{
invoke(0, new Request(this, current.operation, current.mode, inParams, current.ctx, amdCB));
}
+#endif
void
-LocatorI::foundLocator(const Ice::LocatorPrx& locator)
+LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator)
{
Lock sync(*this);
if(!locator || (!_instanceName.empty() && locator->ice_getIdentity().category != _instanceName))
@@ -407,7 +600,7 @@ LocatorI::foundLocator(const Ice::LocatorPrx& locator)
if(_pendingRetryCount > 0) // No need to retry, we found a locator.
{
- _timer->cancel(this);
+ _timer->cancel(shared_from_this());
_pendingRetryCount = 0;
}
@@ -460,7 +653,7 @@ LocatorI::foundLocator(const Ice::LocatorPrx& locator)
}
void
-LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request)
+LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request)
{
Lock sync(*this);
if(_locator && _locator != locator)
@@ -482,8 +675,12 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request)
_pendingRetryCount = _retryCount;
try
{
+#ifdef ICE_CPP11_MAPPING
+ _lookup->findLocator_async(_instanceName, _lookupReply); // Send multicast request.
+#else
_lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer->schedule(this, _timeout);
+#endif
+ _timer->schedule(shared_from_this(), _timeout);
}
catch(const Ice::LocalException&)
{
@@ -506,8 +703,12 @@ LocatorI::runTimerTask()
{
try
{
+#ifdef ICE_CPP11_MAPPING
+ _lookup->findLocator_async(_instanceName, _lookupReply); // Send multicast request.
+#else
_lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer->schedule(this, _timeout);
+#endif
+ _timer->schedule(shared_from_this(), _timeout);
return;
}
catch(const Ice::LocalException&)
@@ -525,7 +726,7 @@ LocatorI::runTimerTask()
}
void
-LookupReplyI::foundLocator(const Ice::LocatorPrx& locator, const Ice::Current&)
+LookupReplyI::foundLocator(ICE_IN(Ice::LocatorPrxPtr) locator, const Ice::Current&)
{
_locator->foundLocator(locator);
}