diff options
Diffstat (limited to 'cpp/src/IceLocatorDiscovery/PluginI.cpp')
-rw-r--r-- | cpp/src/IceLocatorDiscovery/PluginI.cpp | 271 |
1 files changed, 236 insertions, 35 deletions
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp index bf30aa4cf8e..5207a096c9c 100644 --- a/cpp/src/IceLocatorDiscovery/PluginI.cpp +++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp @@ -54,10 +54,26 @@ namespace class LocatorI; // Forward declaration -class Request : public IceUtil::Shared +class Request : public Ice::EnableSharedFromThis<Request> { public: +#ifdef ICE_CPP11_MAPPING + Request(LocatorI* locator, + const string& operation, + Ice::OperationMode mode, + const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + const Ice::Context& ctx, + function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> responseCB, + function<void (exception_ptr)> exceptionCB) : + _locator(locator), + _operation(operation), + _mode(mode), + _context(ctx), + _inParams(inParams.first, inParams.second), + _responseCB(move(responseCB)), + _exceptionCB(move(exceptionCB)) +#else Request(LocatorI* locator, const string& operation, Ice::OperationMode mode, @@ -70,10 +86,11 @@ public: _context(ctx), _inParams(inParams.first, inParams.second), _amdCB(amdCB) +#endif { } - void invoke(const Ice::LocatorPrx&); + void invoke(const Ice::LocatorPrxPtr&); void response(const bool, const pair<const Ice::Byte*, const Ice::Byte*>&); void exception(const Ice::Exception&); @@ -84,31 +101,47 @@ protected: const Ice::OperationMode _mode; const Ice::Context _context; const Ice::ByteSeq _inParams; +#ifdef ICE_CPP11_MAPPING + function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> _responseCB; + function<void (exception_ptr)> _exceptionCB; + exception_ptr _exception; +#else const Ice::AMD_Object_ice_invokePtr _amdCB; - - Ice::LocatorPrx _locatorPrx; IceUtil::UniquePtr<Ice::Exception> _exception; +#endif + + Ice::LocatorPrxPtr _locatorPrx; }; -typedef IceUtil::Handle<Request> RequestPtr; +ICE_DEFINE_PTR(RequestPtr, Request); -class LocatorI : public Ice::BlobjectArrayAsync, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex> +class LocatorI : public Ice::BlobjectArrayAsync, + public IceUtil::TimerTask, + private IceUtil::Monitor<IceUtil::Mutex>, + public Ice::EnableSharedFromThis<LocatorI> { public: - LocatorI(const LookupPrx&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrx&); - void setLookupReply(const LookupReplyPrx&); + LocatorI(const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&); + void setLookupReply(const LookupReplyPrxPtr&); +#ifdef ICE_CPP11_MAPPING + virtual void ice_invoke_async(pair<const Ice::Byte*, const Ice::Byte*>, + function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)>, + function<void (exception_ptr)>, + const Ice::Current&); +#else virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const pair<const Ice::Byte*, const Ice::Byte*>&, const Ice::Current&); +#endif - void foundLocator(const Ice::LocatorPrx&); - void invoke(const Ice::LocatorPrx&, const RequestPtr&); + void foundLocator(const Ice::LocatorPrxPtr&); + void invoke(const Ice::LocatorPrxPtr&, const RequestPtr&); private: virtual void runTimerTask(); - const LookupPrx _lookup; + const LookupPrxPtr _lookup; const IceUtil::Time _timeout; const int _retryCount; const IceUtil::Time _retryDelay; @@ -116,15 +149,15 @@ private: string _instanceName; bool _warned; - LookupReplyPrx _lookupReply; - Ice::LocatorPrx _locator; - Ice::LocatorPrx _voidLocator; + LookupReplyPrxPtr _lookupReply; + Ice::LocatorPrxPtr _locator; + Ice::LocatorPrxPtr _voidLocator; IceUtil::Time _nextRetry; int _pendingRetryCount; vector<RequestPtr> _pendingRequests; }; -typedef IceUtil::Handle<LocatorI> LocatorIPtr; +ICE_DEFINE_PTR(LocatorIPtr, LocatorI); class LookupReplyI : public LookupReply { @@ -134,7 +167,7 @@ public: { } - virtual void foundLocator(const Ice::LocatorPrx&, const Ice::Current&); + virtual void foundLocator(ICE_IN(Ice::LocatorPrxPtr), const Ice::Current&); private: @@ -155,6 +188,25 @@ class VoidLocatorI : public Ice::Locator { public: +#ifdef ICE_CPP11_MAPPING + virtual void + findObjectById_async(::Ice::Identity, + function<void (const shared_ptr<::Ice::ObjectPrx>&)> response, + function<void (exception_ptr)>, + const Ice::Current&) const + { + response(nullptr); + } + + virtual void + findAdapterById_async(string, + function<void (const shared_ptr<::Ice::ObjectPrx>&)> response, + function<void (exception_ptr)>, + const Ice::Current&) const + { + response(nullptr); + } +#else virtual void findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB, const Ice::Identity&, @@ -170,11 +222,12 @@ public: { amdCB->ice_response(0); } +#endif - virtual Ice::LocatorRegistryPrx + virtual Ice::LocatorRegistryPrxPtr getRegistry(const Ice::Current&) const { - return 0; + return ICE_NULLPTR; } }; @@ -237,17 +290,40 @@ PluginI::initialize() lookupEndpoints = os.str(); } - Ice::ObjectPrx lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints); + Ice::ObjectPrxPtr lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints); lookupPrx = lookupPrx->ice_collocationOptimized(false); // No collocation optimization for the multicast proxy! try { // Ensure we can establish a connection to the multicast proxy // but don't block. +#ifdef ICE_CPP11_MAPPING + promise<bool> sent; + promise<void> completed; + + lookupPrx->ice_getConnection_async( + [&](shared_ptr<Ice::Connection>) + { + completed.set_value(); + }, + [&](exception_ptr ex) + { + completed.set_exception(ex); + }, + [&](bool sentSynchronously) + { + sent.set_value(sentSynchronously); + }); + if(sent.get_future().get()) + { + completed.get_future().get(); + } +#else Ice::AsyncResultPtr result = lookupPrx->begin_ice_getConnection(); if(result->sentSynchronously()) { lookupPrx->end_ice_getConnection(result); } +#endif } catch(const Ice::LocalException& ex) { @@ -258,17 +334,17 @@ PluginI::initialize() throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str()); } - Ice::LocatorPrx voidLocator = Ice::LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(new VoidLocatorI())); + Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI))); string instanceName = properties->getProperty("IceLocatorDiscovery.InstanceName"); Ice::Identity id; id.name = "Locator"; id.category = !instanceName.empty() ? instanceName : IceUtil::generateUUID(); - LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties, instanceName, voidLocator); - _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->add(locator, id))); + LocatorIPtr locator = ICE_MAKE_SHARED(LocatorI, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName, voidLocator); + _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(locator, id))); - Ice::ObjectPrx lookupReply = _replyAdapter->addWithUUID(new LookupReplyI(locator))->ice_datagram(); - locator->setLookupReply(LookupReplyPrx::uncheckedCast(lookupReply)); + Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, locator))->ice_datagram(); + locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply)); _replyAdapter->activate(); _locatorAdapter->activate(); @@ -282,8 +358,55 @@ PluginI::destroy() } void -Request::invoke(const Ice::LocatorPrx& l) +Request::invoke(const Ice::LocatorPrxPtr& l) { +#ifdef ICE_CPP11_MAPPING + if(l != _locatorPrx) + { + _locatorPrx = l; + try + { + l->ice_invoke_async(_operation, _mode, _inParams, + [self = shared_from_this()](bool ok, vector<Ice::Byte> outParams) + { + pair<const Ice::Byte*, const Ice::Byte*> outPair; + if(outParams.empty()) + { + outPair.first = outPair.second = 0; + } + else + { + outPair.first = &outParams[0]; + outPair.second = outPair.first + outParams.size(); + } + self->response(ok, outPair); + }, + [self = shared_from_this()](exception_ptr e) + { + try + { + rethrow_exception(e); + } + catch(const Ice::Exception& ex) + { + self->exception(ex); + } + }, + nullptr, + _context); + } + catch(const Ice::LocalException& ex) + { + exception(ex); + } + } + else + { + assert(_exception); // Don't retry if the proxy didn't change + _exceptionCB(_exception); + } + +#else if(l != _locatorPrx) { _locatorPrx = l; @@ -302,17 +425,74 @@ Request::invoke(const Ice::LocatorPrx& l) assert(_exception.get()); // Don't retry if the proxy didn't change _amdCB->ice_exception(*_exception.get()); } +#endif } void Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams) { +#ifdef ICE_CPP11_MAPPING + _responseCB(ok, outParams); +#else _amdCB->ice_response(ok, outParams); +#endif } void Request::exception(const Ice::Exception& ex) { +#ifdef ICE_CPP11_MAPPING + try + { + ex.ice_throw(); + } + catch(const Ice::RequestFailedException&) + { + _exceptionCB(current_exception()); + } + catch(const Ice::UnknownException&) + { + _exceptionCB(current_exception()); + } + catch(const Ice::NoEndpointException&) + { + try + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + catch(...) + { + _exceptionCB(current_exception()); + } + } + catch(const Ice::CommunicatorDestroyedException&) + { + try + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + catch(...) + { + _exceptionCB(current_exception()); + } + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + try + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + catch(...) + { + _exceptionCB(current_exception()); + } + } + catch(const Ice::Exception&) + { + _exception = current_exception(); + _locator->invoke(_locatorPrx, shared_from_this()); // Retry with new locator proxy + } +#else try { ex.ice_throw(); @@ -340,14 +520,15 @@ Request::exception(const Ice::Exception& ex) catch(const Ice::Exception&) { _exception.reset(ex.ice_clone()); - _locator->invoke(_locatorPrx, this); // Retry with new locator proxy + _locator->invoke(_locatorPrx, shared_from_this()); // Retry with new locator proxy } +#endif } -LocatorI::LocatorI(const LookupPrx& lookup, +LocatorI::LocatorI(const LookupPrxPtr& lookup, const Ice::PropertiesPtr& p, const string& instanceName, - const Ice::LocatorPrx& voidLocator) : + const Ice::LocatorPrxPtr& voidLocator) : _lookup(lookup), _timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.Timeout", 300))), _retryCount(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryCount", 3)), @@ -362,11 +543,22 @@ LocatorI::LocatorI(const LookupPrx& lookup, } void -LocatorI::setLookupReply(const LookupReplyPrx& lookupReply) +LocatorI::setLookupReply(const LookupReplyPrxPtr& lookupReply) { _lookupReply = lookupReply; } +#ifdef ICE_CPP11_MAPPING +void +LocatorI::ice_invoke_async(pair<const Ice::Byte*, const Ice::Byte*> inParams, + function<void (bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> responseCB, + function<void (exception_ptr)> exceptionCB, + const Ice::Current& current) +{ + invoke(nullptr, make_shared<Request>(this, current.operation, current.mode, inParams, current.ctx, + move(responseCB), move(exceptionCB))); +} +#else void LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, const pair<const Ice::Byte*, const Ice::Byte*>& inParams, @@ -374,9 +566,10 @@ LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, { invoke(0, new Request(this, current.operation, current.mode, inParams, current.ctx, amdCB)); } +#endif void -LocatorI::foundLocator(const Ice::LocatorPrx& locator) +LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator) { Lock sync(*this); if(!locator || (!_instanceName.empty() && locator->ice_getIdentity().category != _instanceName)) @@ -407,7 +600,7 @@ LocatorI::foundLocator(const Ice::LocatorPrx& locator) if(_pendingRetryCount > 0) // No need to retry, we found a locator. { - _timer->cancel(this); + _timer->cancel(shared_from_this()); _pendingRetryCount = 0; } @@ -460,7 +653,7 @@ LocatorI::foundLocator(const Ice::LocatorPrx& locator) } void -LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request) +LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request) { Lock sync(*this); if(_locator && _locator != locator) @@ -482,8 +675,12 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request) _pendingRetryCount = _retryCount; try { +#ifdef ICE_CPP11_MAPPING + _lookup->findLocator_async(_instanceName, _lookupReply); // Send multicast request. +#else _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); +#endif + _timer->schedule(shared_from_this(), _timeout); } catch(const Ice::LocalException&) { @@ -506,8 +703,12 @@ LocatorI::runTimerTask() { try { +#ifdef ICE_CPP11_MAPPING + _lookup->findLocator_async(_instanceName, _lookupReply); // Send multicast request. +#else _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); +#endif + _timer->schedule(shared_from_this(), _timeout); return; } catch(const Ice::LocalException&) @@ -525,7 +726,7 @@ LocatorI::runTimerTask() } void -LookupReplyI::foundLocator(const Ice::LocatorPrx& locator, const Ice::Current&) +LookupReplyI::foundLocator(ICE_IN(Ice::LocatorPrxPtr) locator, const Ice::Current&) { _locator->foundLocator(locator); } |