diff options
author | Jose <jose@zeroc.com> | 2015-12-01 17:36:19 +0100 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2015-12-01 17:36:19 +0100 |
commit | 2029ff368e49fae489b8ec8fc12f7d126b182dfb (patch) | |
tree | 6075d652b7851831c39e6f22004d8720864f1a60 /cpp/src/Ice/OutgoingAsync.cpp | |
parent | The default LMDB map size for IceGrid and IceStorm is now 10MB (Windows) (diff) | |
download | ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.bz2 ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.xz ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.zip |
C++11 mapping initial commit
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 397 |
1 files changed, 383 insertions, 14 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index ee1e6d5a437..cfe5adda62e 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -22,10 +22,12 @@ using namespace std; using namespace Ice; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } +#endif bool OutgoingAsyncBase::sent() @@ -52,12 +54,20 @@ OutgoingAsyncBase::getIs() return 0; // Must be overriden by request that can handle responses } +#ifdef ICE_CPP11_MAPPING +OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate) : + AsyncResult(communicator, instance, operation, delegate), +#else OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, const InstancePtr& instance, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : AsyncResult(communicator, instance, operation, delegate, cookie), +#endif _os(instance.get(), Ice::currentProtocolEncoding) { } @@ -83,7 +93,7 @@ OutgoingAsyncBase::finished(const Exception& ex) return AsyncResult::finished(ex); } -Ice::ObjectPrx +Ice::ObjectPrxPtr ProxyOutgoingAsyncBase::getProxy() const { return _proxy; @@ -101,7 +111,11 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) _cachedConnection = 0; if(_proxy->__reference()->getInvocationTimeout() == -2) { +#ifdef ICE_CPP11_MAPPING + _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); +#else _instance->timer()->cancel(this); +#endif } // @@ -115,7 +129,12 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) // the retry interval is 0. This method can be called with the // connection locked so we can't just retry here. // +#ifdef ICE_CPP11_MAPPING + _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), + handleException(exc)); +#else _instance->retryQueue()->add(this, handleException(exc)); +#endif return false; } catch(const Exception& ex) @@ -136,7 +155,11 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex) // connection to be done. // _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry. +#ifdef ICE_CPP11_MAPPING + _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), 0); +#else _instance->retryQueue()->add(this, 0); +#endif } catch(const Ice::Exception& exc) { @@ -155,7 +178,12 @@ ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) const int timeout = _cachedConnection->timeout(); if(timeout > 0) { +#ifdef ICE_CPP11_MAPPING + _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()), + IceUtil::Time::milliSeconds(timeout)); +#else _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout)); +#endif } } AsyncResult::cancelable(handler); @@ -187,13 +215,20 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) } } -ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx, +#ifdef ICE_CPP11_MAPPING +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx, + const string& operation, + const CallbackBasePtr& delegate) : + OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate), +#else +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), +#endif _proxy(prx), - _mode(Normal), + _mode(ICE_ENUM(OperationMode, Normal)), _cnt(0), _sent(false) { @@ -209,7 +244,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); if(invocationTimeout > 0) { +#ifdef ICE_CPP11_MAPPING + _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()), + IceUtil::Time::milliSeconds(invocationTimeout)); +#else _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); +#endif } } else @@ -223,7 +263,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { _sent = false; _handler = _proxy->__getRequestHandler(); +#ifdef ICE_CPP11_MAPPING + AsyncStatus status = _handler->sendAsyncRequest( + dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this())); +#else AsyncStatus status = _handler->sendAsyncRequest(this); +#endif if(status & AsyncStatusSent) { if(userThread) @@ -258,7 +303,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) int interval = handleException(ex); if(interval > 0) { +#ifdef ICE_CPP11_MAPPING + _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), + interval); +#else _instance->retryQueue()->add(this, interval); +#endif return; } else @@ -293,7 +343,11 @@ ProxyOutgoingAsyncBase::sent(bool done) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); +#ifdef ICE_CPP11_MAPPING + _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); +#else + _instance->timer()->cancel(this); +#endif } } return OutgoingAsyncBase::sent(done); @@ -304,7 +358,11 @@ ProxyOutgoingAsyncBase::finished(const Exception& ex) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); +#ifdef ICE_CPP11_MAPPING + _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); +#else + _instance->timer()->cancel(this); +#endif } return OutgoingAsyncBase::finished(ex); } @@ -314,7 +372,11 @@ ProxyOutgoingAsyncBase::finished(bool ok) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); +#ifdef ICE_CPP11_MAPPING + _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); +#else + _instance->timer()->cancel(this); +#endif } return AsyncResult::finished(ok); } @@ -338,11 +400,18 @@ ProxyOutgoingAsyncBase::runTimerTask() } } -OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, +#ifdef ICE_CPP11_MAPPING +OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, + const string& operation, + const CallbackBasePtr& delegate) : + ProxyOutgoingAsyncBase(prx, operation, delegate), +#else +OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), +#endif _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) { } @@ -353,7 +422,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); _mode = mode; - _observer.attach(_proxy.get(), operation, context); + _observer.attach(_proxy, operation, context); switch(_proxy->__reference()->getMode()) { @@ -394,7 +463,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex _os.write(static_cast<Byte>(_mode)); - if(context != 0) + if(context != &Ice::noExplicitContext) { // // Explicit context @@ -429,7 +498,11 @@ AsyncStatus OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) { _cachedConnection = connection; +#ifdef ICE_CPP11_MAPPING + return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), compress, response, 0); +#else return connection->sendAsyncRequest(this, compress, response, 0); +#endif } AsyncStatus @@ -620,13 +693,20 @@ OutgoingAsync::completed() } } -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy, +#ifdef ICE_CPP11_MAPPING +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy, + const string& operation, + const CallbackBasePtr& delegate) : + ProxyOutgoingAsyncBase(proxy, operation, delegate) +#else +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +#endif { - _observer.attach(proxy.get(), operation, 0); + _observer.attach(proxy, operation, 0); _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); } @@ -645,7 +725,12 @@ ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compre } } _cachedConnection = connection; +#ifdef ICE_CPP11_MAPPING + return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), + compress, false, _batchRequestNum); +#else return connection->sendAsyncRequest(this, compress, false, _batchRequestNum); +#endif } AsyncStatus @@ -672,13 +757,20 @@ ProxyFlushBatchAsync::invoke() invokeImpl(true); // userThread = true } -ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, +#ifdef ICE_CPP11_MAPPING +ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx, + const string& operation, + const CallbackBasePtr& delegate) : + ProxyOutgoingAsyncBase(prx, operation, delegate) +#else +ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) +#endif { - _observer.attach(prx.get(), operation, 0); + _observer.attach(prx, operation, 0); } AsyncStatus @@ -708,13 +800,23 @@ ProxyGetConnection::invoke() invokeImpl(true); // userThread = true } +#ifdef ICE_CPP11_MAPPING +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate) : + OutgoingAsyncBase(communicator, instance, operation, delegate), +#else ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const CommunicatorPtr& communicator, const InstancePtr& instance, const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : - OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) + OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), +#endif + _connection(connection) { _observer.attach(instance.get(), operation); } @@ -742,7 +844,12 @@ ConnectionFlushBatchAsync::invoke() } else { +#ifdef ICE_CPP11_MAPPING + status = _connection->sendAsyncRequest( + dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), false, false, batchRequestNum); +#else status = _connection->sendAsyncRequest(this, false, false, batchRequestNum); +#endif } if(status & AsyncStatusSent) @@ -770,12 +877,20 @@ ConnectionFlushBatchAsync::invoke() } } +#ifdef ICE_CPP11_MAPPING +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& cb) : + AsyncResult(communicator, instance, operation, cb) +#else CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, const InstancePtr& instance, const string& operation, const CallbackBasePtr& cb, const LocalObjectPtr& cookie) : AsyncResult(communicator, instance, operation, cb, cookie) +#endif { _observer.attach(instance.get(), operation); @@ -797,7 +912,11 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, const InstancePtr& instance, InvocationObserver& observer) : +#ifdef ICE_CPP11_MAPPING + OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback), +#else OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), +#endif _outAsync(outAsync), _observer(observer) { @@ -836,7 +955,12 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) try { +#ifdef ICE_CPP11_MAPPING + auto flushBatch = make_shared<FlushBatch>( + dynamic_pointer_cast<CommunicatorFlushBatchAsync>(shared_from_this()), _instance, _observer); +#else OutgoingAsyncBasePtr flushBatch = new FlushBatch(this, _instance, _observer); +#endif int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); if(batchRequestNum == 0) { @@ -885,3 +1009,248 @@ CommunicatorFlushBatchAsync::check(bool userThread) } } } + +#ifdef ICE_CPP11_MAPPING +OnewayClosureCallback::OnewayClosureCallback( + const string& name, + const shared_ptr<Ice::ObjectPrx>& proxy, + function<void ()> response, + function<void (exception_ptr)> exception, + function<void (bool)> sent) : + __name(name), + __proxy(proxy), + __response(response), + __exception(exception), + __sent(sent) +{ +} + +void +OnewayClosureCallback::sent(const AsyncResultPtr& __result) const +{ + if(__sent) + { + __sent(__result->sentSynchronously()); + } +} + +bool +OnewayClosureCallback::hasSentCallback() const +{ + return __sent != nullptr; +} + +void +OnewayClosureCallback::completed(const AsyncResultPtr& __result) const +{ + try + { + AsyncResult::__check(__result, __proxy.get(), __name); + bool ok = __result->__wait(); + if(__proxy->__reference()->getMode() == Reference::ModeTwoway) + { + if(!ok) + { + try + { + __result->__throwUserException(); + } + catch(const UserException& __ex) + { + throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); + } + } + __result->__readEmptyParams(); + if(__response) + { + try + { + __response(); + } + catch(...) + { + throw current_exception(); + } + } + } + } + catch(const exception_ptr& ex) + { + rethrow_exception(ex); + } + catch(const Ice::Exception&) + { + if(__exception) + { + __exception(current_exception()); + } + } +} + +function<void ()> +OnewayClosureCallback::invoke( + const string& __name, + const shared_ptr<Ice::ObjectPrx>& __proxy, + Ice::OperationMode __mode, + Ice::FormatType __format, + function<void (BasicStream*)> __marshal, + function<void ()> __response, + function<void (exception_ptr)> __exception, + function<void (bool)> __sent, + const Ice::Context& __context) +{ + auto __result = make_shared<OutgoingAsync>(__proxy, __name, + make_shared<OnewayClosureCallback>(__name, __proxy, move(__response), move(__exception), move(__sent))); + try + { + __result->prepare(__name, __mode, &__context); + if(__marshal) + { + __marshal(__result->startWriteParams(__format)); + __result->endWriteParams(); + } + else + { + __result->writeEmptyParams(); + } + __result->invoke(); + } + catch(const exception_ptr& ex) + { + rethrow_exception(ex); + } + catch(const Exception& __ex) + { + __result->abort(__ex); + } + + return [__result]() + { + __result->cancel(); + }; +} + +TwowayClosureCallback::TwowayClosureCallback( + const string& name, + const shared_ptr<Ice::ObjectPrx>& proxy, + bool readEmptyParams, + function<void (BasicStream*)> read, + function<void (const UserException&)> userException, + function<void (exception_ptr)> exception, + function<void (bool)> sent) : + __name(name), + __proxy(proxy), + __readEmptyParams(readEmptyParams), + __read(move(read)), + __userException(move(userException)), + __exception(move(exception)), + __sent(move(sent)) +{ +} + +void +TwowayClosureCallback::sent(const AsyncResultPtr& result) const +{ + if(__sent != nullptr) + { + __sent(result->sentSynchronously()); + } +} + +bool +TwowayClosureCallback::hasSentCallback() const +{ + return __sent != nullptr; +} + +void +TwowayClosureCallback::completed(const AsyncResultPtr& __result) const +{ + try + { + AsyncResult::__check(__result, __proxy.get(), __name); + if(!__result->__wait()) + { + try + { + __result->__throwUserException(); + } + catch(const Ice::UserException& __ex) + { + if(__userException) + { + __userException(__ex); + } + throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name()); + } + } + else + { + if(__readEmptyParams) + { + __result->__readEmptyParams(); + __read(0); + } + else + { + __read(__result->__startReadParams()); + } + } + } + catch(const exception_ptr& ex) + { + rethrow_exception(ex); + } + catch(const Ice::Exception&) + { + if(__exception) + { + __exception(current_exception()); + } + } +} + +function<void ()> +TwowayClosureCallback::invoke( + const string& __name, + const shared_ptr<Ice::ObjectPrx>& __proxy, + OperationMode __mode, + FormatType __format, + function<void (BasicStream*)> __write, + bool __readEmptyParams, + function<void (BasicStream*)> __read, + function<void (const UserException&)> __userException, + function<void (exception_ptr)> __exception, + function<void (bool)> __sent, + const Context& __context) +{ + assert(__proxy); + auto __result = make_shared<OutgoingAsync>(__proxy, __name, + make_shared<TwowayClosureCallback>(__name, __proxy, __readEmptyParams, move(__read), + move(__userException), move(__exception), move(__sent))); + __proxy->__checkAsyncTwowayOnly(__name); + try + { + __result->prepare(__name, __mode, &__context); + if(__write) + { + __write(__result->startWriteParams(__format)); + __result->endWriteParams(); + } + else + { + __result->writeEmptyParams(); + } + __result->invoke(); + } + catch(const Exception& __ex) + { + __result->abort(__ex); + } + + return [__result]() + { + __result->cancel(); + }; +} +#endif |