diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 1309 |
1 files changed, 806 insertions, 503 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 5c06c76a3fe..34828d5262d 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -17,6 +17,9 @@ #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> #include <Ice/RetryQueue.h> +#include <Ice/ConnectionFactory.h> +#include <Ice/ObjectAdapterFactory.h> +#include <Ice/LoggerUtil.h> using namespace std; using namespace Ice; @@ -29,78 +32,542 @@ IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } #endif +const unsigned char OutgoingAsyncBase::OK = 0x1; +const unsigned char OutgoingAsyncBase::Sent = 0x2; +#ifndef ICE_CPP11_MAPPING +const unsigned char OutgoingAsyncBase::Done = 0x4; +const unsigned char OutgoingAsyncBase::EndCalled = 0x8; +#endif + bool OutgoingAsyncBase::sent() { - return sent(true); + return sentImpl(true); } bool -OutgoingAsyncBase::completed(const Exception& ex) +OutgoingAsyncBase::exception(const Exception& ex) { - return finished(ex); + return exceptionImpl(ex); } bool -OutgoingAsyncBase::completed() +OutgoingAsyncBase::response() { assert(false); // Must be overriden by request that can handle responses return false; } -InputStream* -OutgoingAsyncBase::getIs() +void +OutgoingAsyncBase::invokeSentAsync() +{ + class AsynchronousSent : public DispatchWorkItem + { + public: + + AsynchronousSent(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(connection), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeSent(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, shared_from_this())); + } + catch(const Ice::CommunicatorDestroyedException&) + { + } +} + +void +OutgoingAsyncBase::invokeExceptionAsync() +{ + class AsynchronousException : public DispatchWorkItem + { + public: + + AsynchronousException(const ConnectionPtr& c, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(c), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeException(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, shared_from_this())); +} + +void +OutgoingAsyncBase::invokeResponseAsync() { - return 0; // Must be overriden by request that can handle responses + class AsynchronousResponse : public DispatchWorkItem + { + public: + + AsynchronousResponse(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(connection), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeResponse(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousResponse(_cachedConnection, shared_from_this())); } +void +OutgoingAsyncBase::invokeSent() +{ + try + { + handleInvokeSent(_sentSynchronously, this); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + if(_observer && _doneInSent) + { + _observer.detach(); + } +} + +void +OutgoingAsyncBase::invokeException() +{ + try + { + try + { + ICE_RETHROW_EXCEPTION(_ex); + } + catch(const Ice::Exception& ex) + { + handleInvokeException(ex, this); + } + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +OutgoingAsyncBase::invokeResponse() +{ + if(ICE_EXCEPTION_ISSET(_ex)) + { + invokeException(); + return; + } + + try + { #ifdef ICE_CPP11_MAPPING -OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate) : - AsyncResult(communicator, instance, operation, delegate), + try + { + handleInvokeResponse(_state & OK, this); + } + catch(const Ice::Exception& ex) + { + if(handleException(ex)) + { + handleInvokeException(ex, this); + } + } + catch(const exception_ptr& ex) + { + rethrow_exception(ex); + } #else -OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie), + handleInvokeResponse(_state & OK, this); #endif - _os(instance.get(), Ice::currentProtocolEncoding) + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +OutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + Lock sync(_m); + if(ICE_EXCEPTION_ISSET(_cancellationException)) + { + try + { + ICE_RETHROW_EXCEPTION(_cancellationException); + } + catch(const Ice::LocalException&) + { + ICE_RESET_EXCEPTION(_cancellationException, ICE_NULLPTR); + throw; + } + } + _cancellationHandler = handler; +} + +void +OutgoingAsyncBase::cancel() +{ + cancel(Ice::InvocationCanceledException(__FILE__, __LINE__)); +} + +OutgoingAsyncBase::OutgoingAsyncBase(const InstancePtr& instance) : + _instance(instance), + _sentSynchronously(false), + _doneInSent(false), + _state(0), + _os(instance.get(), Ice::currentProtocolEncoding), + _is(instance.get(), Ice::currentProtocolEncoding) { } bool -OutgoingAsyncBase::sent(bool done) +OutgoingAsyncBase::sentImpl(bool done) { + Lock sync(_m); + bool alreadySent = _state & Sent; + _state |= Sent; if(done) { + _doneInSent = true; _childObserver.detach(); + _cancellationHandler = 0; + } + +#ifndef ICE_CPP11_MAPPING + if(done) + { + _state |= Done | OK; + } + _m.notifyAll(); +#endif + + bool invoke = handleSent(done, alreadySent); + if(!invoke && _doneInSent) + { + _observer.detach(); } - return AsyncResult::sent(done); + return invoke; } bool -OutgoingAsyncBase::finished(const Exception& ex) +OutgoingAsyncBase::exceptionImpl(const Exception& ex) { + Lock sync(_m); + ICE_RESET_EXCEPTION(_ex, ex.ice_clone()); if(_childObserver) { _childObserver.failed(ex.ice_id()); _childObserver.detach(); } - return AsyncResult::finished(ex); + _cancellationHandler = 0; + _observer.failed(ex.ice_id()); + +#ifndef ICE_CPP11_MAPPING + _state |= Done; + _m.notifyAll(); +#endif + + bool invoke = handleException(ex); + if(!invoke) + { + _observer.detach(); + } + return invoke; } -Ice::ObjectPrxPtr -ProxyOutgoingAsyncBase::getProxy() const +bool +OutgoingAsyncBase::responseImpl(bool ok) { - return _proxy; + Lock sync(_m); + if(ok) + { + _state |= OK; + } + + _cancellationHandler = 0; + +#ifndef ICE_CPP11_MAPPING + _state |= Done; + _m.notifyAll(); +#endif + + bool invoke; + try + { + invoke = handleResponse(ok); + } + catch(const Ice::Exception& ex) + { + ICE_RESET_EXCEPTION(_ex, ex.ice_clone()); + invoke = handleException(ex); + } + if(!invoke) + { + _observer.detach(); + } + return invoke; +} + +void +OutgoingAsyncBase::cancel(const Ice::LocalException& ex) +{ + CancellationHandlerPtr handler; + { + Lock sync(_m); + ICE_RESET_EXCEPTION(_cancellationException, ex.ice_clone()); + if(!_cancellationHandler) + { + return; + } + handler = _cancellationHandler; + } + handler->asyncRequestCanceled(shared_from_this(), ex); +} + +#ifndef ICE_CPP11_MAPPING + +Int +OutgoingAsyncBase::getHash() const +{ + return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); +} + +CommunicatorPtr +OutgoingAsyncBase::getCommunicator() const +{ + return 0; +} + +ConnectionPtr +OutgoingAsyncBase::getConnection() const +{ + return 0; +} + +ObjectPrxPtr +OutgoingAsyncBase::getProxy() const +{ + return 0; +} + +Ice::LocalObjectPtr +OutgoingAsyncBase::getCookie() const +{ + return _cookie; +} + +const std::string& +OutgoingAsyncBase::getOperation() const +{ + assert(false); // Must be overriden + static string empty; + return empty; +} + +bool +OutgoingAsyncBase::isCompleted() const +{ + Lock sync(_m); + return _state & Done; +} + +void +OutgoingAsyncBase::waitForCompleted() +{ + Lock sync(_m); + while(!(_state & Done)) + { + _m.wait(); + } } bool -ProxyOutgoingAsyncBase::completed(const Exception& exc) +OutgoingAsyncBase::isSent() const +{ + Lock sync(_m); + return _state & Sent; +} + +void +OutgoingAsyncBase::waitForSent() +{ + Lock sync(_m); + while(!(_state & Sent) && !_ex.get()) + { + _m.wait(); + } +} + +bool +OutgoingAsyncBase::sentSynchronously() const +{ + return _sentSynchronously; +} + +void +OutgoingAsyncBase::throwLocalException() const +{ + Lock sync(_m); + if(_ex.get()) + { + _ex->ice_throw(); + } +} + +bool +OutgoingAsyncBase::__wait() +{ + Lock sync(_m); + if(_state & EndCalled) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); + } + _state |= EndCalled; + while(!(_state & Done)) + { + _m.wait(); + } + + if(_ex.get()) + { + _ex->ice_throw(); + } + return _state & OK; +} + +Ice::InputStream* +OutgoingAsyncBase::__startReadParams() +{ + _is.startEncapsulation(); + return &_is; +} + +void +OutgoingAsyncBase::__endReadParams() +{ + _is.endEncapsulation(); +} + +void +OutgoingAsyncBase::__readEmptyParams() +{ + _is.skipEmptyEncapsulation(); +} + +void +OutgoingAsyncBase::__readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) +{ + _is.readEncapsulation(encaps, sz); +} + +void +OutgoingAsyncBase::__throwUserException() +{ + try + { + _is.startEncapsulation(); + _is.throwException(); + } + catch(const Ice::UserException&) + { + _is.endEncapsulation(); + throw; + } +} + +#endif + +void +OutgoingAsyncBase::warning(const std::exception& exc) const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Ice::Warning out(_instance->initializationData().logger); + const Ice::Exception* ex = dynamic_cast<const Ice::Exception*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << *ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } +} + +void +OutgoingAsyncBase::warning() const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Ice::Warning out(_instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } +} + +bool +ProxyOutgoingAsyncBase::exception(const Exception& exc) { if(_childObserver) { @@ -111,11 +578,7 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) _cachedConnection = 0; if(_proxy->__reference()->getInvocationTimeout() == -2) { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); -#else - _instance->timer()->cancel(this); -#endif + _instance->timer()->cancel(shared_from_this()); } // @@ -129,18 +592,27 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) // the retry interval is 0. This method can be called with the // connection locked so we can't just retry here. // -#ifdef ICE_CPP11_MAPPING - _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), - handleException(exc)); -#else - _instance->retryQueue()->add(this, handleException(exc)); -#endif + _instance->retryQueue()->add(shared_from_this(), _proxy->__handleException(exc, _handler, _mode, _sent, _cnt)); return false; } catch(const Exception& ex) { - return finished(ex); // No retries, we're done + return exceptionImpl(ex); // No retries, we're done + } +} + +void +ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) + { + const int timeout = _cachedConnection->timeout(); + if(timeout > 0) + { + _instance->timer()->schedule(shared_from_this(), IceUtil::Time::milliSeconds(timeout)); + } } + OutgoingAsyncBase::cancelable(handler); } void @@ -155,41 +627,18 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex) // connection to be done. // _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry. -#ifdef ICE_CPP11_MAPPING - _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), 0); -#else - _instance->retryQueue()->add(this, 0); -#endif + _instance->retryQueue()->add(shared_from_this(), 0); } catch(const Ice::Exception& exc) { - if(completed(exc)) + if(exception(exc)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } void -ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) -{ - if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) - { - const int timeout = _cachedConnection->timeout(); - if(timeout > 0) - { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()), - IceUtil::Time::milliSeconds(timeout)); -#else - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout)); -#endif - } - } - AsyncResult::cancelable(handler); -} - -void ProxyOutgoingAsyncBase::retry() { invokeImpl(false); @@ -200,9 +649,9 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) { assert(!_childObserver); - if(finished(ex)) + if(exceptionImpl(ex)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) { @@ -215,18 +664,22 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) } } -#ifdef ICE_CPP11_MAPPING -ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate) : - OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate), -#else -ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), +#ifndef ICE_CPP11_MAPPING +Ice::ObjectPrx +ProxyOutgoingAsyncBase::getProxy() const +{ + return _proxy; +} + +Ice::CommunicatorPtr +ProxyOutgoingAsyncBase::getCommunicator() const +{ + return _proxy->ice_getCommunicator(); +} #endif + +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx) : + OutgoingAsyncBase(prx->__reference()->getInstance()), _proxy(prx), _mode(ICE_ENUM(OperationMode, Normal)), _cnt(0), @@ -234,6 +687,10 @@ ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx, { } +ProxyOutgoingAsyncBase::~ProxyOutgoingAsyncBase() +{ +} + void ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { @@ -244,12 +701,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); if(invocationTimeout > 0) { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()), - IceUtil::Time::milliSeconds(invocationTimeout)); -#else - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); -#endif + _instance->timer()->schedule(shared_from_this(), IceUtil::Time::milliSeconds(invocationTimeout)); } } else @@ -263,12 +715,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { _sent = false; _handler = _proxy->__getRequestHandler(); -#ifdef ICE_CPP11_MAPPING - AsyncStatus status = _handler->sendAsyncRequest( - dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this())); -#else - AsyncStatus status = _handler->sendAsyncRequest(this); -#endif + AsyncStatus status = _handler->sendAsyncRequest(shared_from_this()); if(status & AsyncStatusSent) { if(userThread) @@ -300,15 +747,10 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) _childObserver.failed(ex.ice_id()); _childObserver.detach(); } - int interval = handleException(ex); + int interval = _proxy->__handleException(ex, _handler, _mode, _sent, _cnt); if(interval > 0) { -#ifdef ICE_CPP11_MAPPING - _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), - interval); -#else - _instance->retryQueue()->add(this, interval); -#endif + _instance->retryQueue()->add(shared_from_this(), interval); return; } else @@ -328,63 +770,45 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { throw; } - else if(finished(ex)) // No retries, we're done + else if(exceptionImpl(ex)) // No retries, we're done { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } bool -ProxyOutgoingAsyncBase::sent(bool done) +ProxyOutgoingAsyncBase::sentImpl(bool done) { _sent = true; if(done) { if(_proxy->__reference()->getInvocationTimeout() != -1) { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); -#else - _instance->timer()->cancel(this); -#endif + _instance->timer()->cancel(shared_from_this()); } } - return OutgoingAsyncBase::sent(done); + return OutgoingAsyncBase::sentImpl(done); } bool -ProxyOutgoingAsyncBase::finished(const Exception& ex) +ProxyOutgoingAsyncBase::exceptionImpl(const Exception& ex) { if(_proxy->__reference()->getInvocationTimeout() != -1) { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); -#else - _instance->timer()->cancel(this); -#endif + _instance->timer()->cancel(shared_from_this()); } - return OutgoingAsyncBase::finished(ex); + return OutgoingAsyncBase::exceptionImpl(ex); } bool -ProxyOutgoingAsyncBase::finished(bool ok) +ProxyOutgoingAsyncBase::responseImpl(bool ok) { if(_proxy->__reference()->getInvocationTimeout() != -1) { -#ifdef ICE_CPP11_MAPPING - _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this())); -#else - _instance->timer()->cancel(this); -#endif + _instance->timer()->cancel(shared_from_this()); } - return AsyncResult::finished(ok); -} - -int -ProxyOutgoingAsyncBase::handleException(const Exception& exc) -{ - return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); + return OutgoingAsyncBase::responseImpl(ok); } void @@ -400,19 +824,10 @@ ProxyOutgoingAsyncBase::runTimerTask() } } -#ifdef ICE_CPP11_MAPPING -OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate) : - ProxyOutgoingAsyncBase(prx, operation, delegate), -#else -OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), -#endif - _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) +OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx) : + ProxyOutgoingAsyncBase(prx), + _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())), + _synchronous(false) { } @@ -491,65 +906,11 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex bool OutgoingAsync::sent() { - return ProxyOutgoingAsyncBase::sent(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy -} - -AsyncStatus -OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) -{ - _cachedConnection = connection; -#ifdef ICE_CPP11_MAPPING - return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), compress, response, 0); -#else - return connection->sendAsyncRequest(this, compress, response, 0); -#endif -} - -AsyncStatus -OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) -{ - return handler->invokeAsyncRequest(this, 0); -} - -void -OutgoingAsync::abort(const Exception& ex) -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); - } - - ProxyOutgoingAsyncBase::abort(ex); -} - -void -OutgoingAsync::invoke() -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _sentSynchronously = true; - _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, getOperation()); - finished(true); - return; // Don't call sent/completed callback for batch AMI requests - } - - // - // NOTE: invokeImpl doesn't throw so this can be called from the - // try block with the catch block calling abort() in case of an - // exception. - // - invokeImpl(true); // userThread = true + return ProxyOutgoingAsyncBase::sentImpl(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy } bool -OutgoingAsync::completed() +OutgoingAsync::response() { // // NOTE: this method is called from ConnectionI.parseMessage @@ -685,29 +1046,116 @@ OutgoingAsync::completed() } } - return finished(replyStatus == replyOK); + return responseImpl(replyStatus == replyOK); } catch(const Exception& ex) { - return completed(ex); + return exception(ex); } } +AsyncStatus +OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) +{ + _cachedConnection = connection; + return connection->sendAsyncRequest(shared_from_this(), compress, response, 0); +} + +AsyncStatus +OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncRequest(this, 0, _synchronous); +} + +void +OutgoingAsync::abort(const Exception& ex) +{ + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) + { + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); + } + + ProxyOutgoingAsyncBase::abort(ex); +} + +void +OutgoingAsync::invoke(const string& operation) +{ + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) + { + _sentSynchronously = true; + _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, operation); + responseImpl(true); + return; // Don't call sent/completed callback for batch AMI requests + } + + // + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. + // + invokeImpl(true); // userThread = true +} + #ifdef ICE_CPP11_MAPPING -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy, - const string& operation, - const CallbackBasePtr& delegate) : - ProxyOutgoingAsyncBase(proxy, operation, delegate) -#else -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +void +OutgoingAsync::invoke(const string& operation, + Ice::OperationMode mode, + Ice::FormatType format, + const Ice::Context& context, + const function<void (Ice::OutputStream*)>& write) +{ + try + { + prepare(operation, mode, context); + if(write) + { + _os.startEncapsulation(_encoding, format); + write(&_os); + _os.endEncapsulation(); + } + else + { + _os.writeEmptyEncapsulation(_encoding); + } + invoke(operation); + } + catch(const Ice::Exception& ex) + { + abort(ex); + } +} + +void +OutgoingAsync::throwUserException() +{ + try + { + _is.startEncapsulation(); + _is.throwException(); + } + catch(const UserException& ex) + { + _is.endEncapsulation(); + if(_userException) + { + _userException(ex); + } + throw UnknownUserException(__FILE__, __LINE__, ex.ice_id()); + } +} + #endif + +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy) { - _observer.attach(proxy, operation, ::Ice::noExplicitContext); - _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); } AsyncStatus @@ -725,12 +1173,7 @@ ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compre } } _cachedConnection = connection; -#ifdef ICE_CPP11_MAPPING - return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), - compress, false, _batchRequestNum); -#else - return connection->sendAsyncRequest(this, compress, false, _batchRequestNum); -#endif + return connection->sendAsyncRequest(shared_from_this(), compress, false, _batchRequestNum); } AsyncStatus @@ -747,39 +1190,29 @@ ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) return AsyncStatusSent; } } - return handler->invokeAsyncRequest(this, _batchRequestNum); + return handler->invokeAsyncRequest(this, _batchRequestNum, false); } void -ProxyFlushBatchAsync::invoke() +ProxyFlushBatchAsync::invoke(const string& operation) { checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); + _batchRequestNum = _proxy->__getBatchRequestQueue()->swap(&_os); invokeImpl(true); // userThread = true } -#ifdef ICE_CPP11_MAPPING -ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate) : - ProxyOutgoingAsyncBase(prx, operation, delegate) -#else -ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) -#endif +ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx) { - _observer.attach(prx, operation, ::Ice::noExplicitContext); } AsyncStatus ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; - if(finished(true)) + if(responseImpl(true)) { - invokeCompletedAsync(); + invokeResponseAsync(); } return AsyncStatusSent; } @@ -787,38 +1220,23 @@ ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) AsyncStatus ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) { - if(finished(true)) + if(responseImpl(true)) { - invokeCompletedAsync(); + invokeResponseAsync(); } return AsyncStatusSent; } void -ProxyGetConnection::invoke() +ProxyGetConnection::invoke(const string& operation) { + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); invokeImpl(true); // userThread = true } -#ifdef ICE_CPP11_MAPPING -ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate) : - OutgoingAsyncBase(communicator, instance, operation, delegate), -#else -ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), -#endif - _connection(connection) +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : + OutgoingAsyncBase(instance), _connection(connection) { - _observer.attach(instance.get(), operation); } ConnectionPtr @@ -828,8 +1246,9 @@ ConnectionFlushBatchAsync::getConnection() const } void -ConnectionFlushBatchAsync::invoke() +ConnectionFlushBatchAsync::invoke(const string& operation) { + _observer.attach(_instance.get(), operation); try { AsyncStatus status; @@ -844,12 +1263,7 @@ ConnectionFlushBatchAsync::invoke() } else { -#ifdef ICE_CPP11_MAPPING - status = _connection->sendAsyncRequest( - dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), false, false, batchRequestNum); -#else - status = _connection->sendAsyncRequest(this, false, false, batchRequestNum); -#endif + status = _connection->sendAsyncRequest(shared_from_this(), false, false, batchRequestNum); } if(status & AsyncStatusSent) @@ -870,44 +1284,30 @@ ConnectionFlushBatchAsync::invoke() } catch(const Ice::LocalException& ee) { - if(completed(ee)) + if(exception(ee)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } #else - if(completed(*ex.get())) + if(exception(*ex.get())) { - invokeCompletedAsync(); + invokeExceptionAsync(); } #endif } catch(const Exception& ex) { - if(completed(ex)) + if(exception(ex)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } -#ifdef ICE_CPP11_MAPPING -CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& cb) : - AsyncResult(communicator, instance, operation, cb) -#else -CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& cb, - const LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, cb, cookie) -#endif +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : + OutgoingAsyncBase(instance) { - _observer.attach(instance.get(), operation); - // // _useCount is initialized to 1 to prevent premature callbacks. // The caller must invoke ready() after all flush requests have @@ -926,24 +1326,20 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, const InstancePtr& instance, InvocationObserver& observer) : -#ifdef ICE_CPP11_MAPPING - OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback), -#else - OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), -#endif - _outAsync(outAsync), - _observer(observer) + OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) { } - virtual bool sent() + virtual bool + sent() { _childObserver.detach(); _outAsync->check(false); return false; } - virtual bool completed(const Exception& ex) + virtual bool + exception(const Exception& ex) { _childObserver.failed(ex.ice_id()); _childObserver.detach(); @@ -951,30 +1347,56 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) return false; } - private: - - virtual InvocationObserver& getObserver() + virtual InvocationObserver& + getObserver() { return _observer; } + virtual bool handleSent(bool, bool) + { + return false; + } + + virtual bool handleException(const Ice::Exception&) + { + return false; + } + + virtual bool handleResponse(bool) + { + return false; + } + + virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + private: + const CommunicatorFlushBatchAsyncPtr _outAsync; InvocationObserver& _observer; }; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + Lock sync(_m); ++_useCount; } try { -#ifdef ICE_CPP11_MAPPING - auto flushBatch = make_shared<FlushBatch>( - dynamic_pointer_cast<CommunicatorFlushBatchAsync>(shared_from_this()), _instance, _observer); -#else - OutgoingAsyncBasePtr flushBatch = new FlushBatch(this, _instance, _observer); -#endif + OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, shared_from_this(), _instance, _observer); int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); if(batchRequestNum == 0) { @@ -993,8 +1415,11 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) } void -CommunicatorFlushBatchAsync::ready() +CommunicatorFlushBatchAsync::invoke(const string& operation) { + _observer.attach(_instance.get(), operation); + _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(shared_from_this()); + _instance->objectAdapterFactory()->flushAsyncBatchRequests(shared_from_this()); check(true); } @@ -1002,7 +1427,7 @@ void CommunicatorFlushBatchAsync::check(bool userThread) { { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + Lock sync(_m); assert(_useCount > 0); if(--_useCount > 0) { @@ -1010,7 +1435,7 @@ CommunicatorFlushBatchAsync::check(bool userThread) } } - if(sent(true)) + if(sentImpl(true)) { if(userThread) { @@ -1025,246 +1450,124 @@ CommunicatorFlushBatchAsync::check(bool userThread) } #ifdef ICE_CPP11_MAPPING -OnewayClosureCallback::OnewayClosureCallback( - const string& name, - const shared_ptr<Ice::ObjectPrx>& proxy, - function<void ()> response, - function<void (exception_ptr)> exception, - function<void (bool)> sent) : - __name(name), - __proxy(proxy), - __response(response), - __exception(exception), - __sent(sent) + +bool +LambdaInvoke::handleSent(bool, bool alreadySent) { + return _sent != nullptr && !alreadySent; // Invoke the sent callback only if not already invoked. } -void -OnewayClosureCallback::sent(const AsyncResultPtr& __result) const +bool +LambdaInvoke::handleException(const Ice::Exception&) { - if(__sent) - { - __sent(__result->sentSynchronously()); - } + return _exception != nullptr; // Invoke the callback } bool -OnewayClosureCallback::hasSentCallback() const +LambdaInvoke::handleResponse(bool) { - return __sent != nullptr; + return _response != nullptr; } void -OnewayClosureCallback::completed(const AsyncResultPtr& __result) const +LambdaInvoke::handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase*) const { - try - { - AsyncResult::__check(__result, __proxy.get(), __name); - bool ok = __result->__wait(); - if(__proxy->__reference()->getMode() == Reference::ModeTwoway) - { - if(!ok) - { - try - { - __result->__throwUserException(); - } - catch(const UserException& __ex) - { - throw UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - __result->__readEmptyParams(); - if(__response) - { - try - { - __response(); - } - catch(...) - { - throw current_exception(); - } - } - } - } - catch(const exception_ptr& ex) - { - rethrow_exception(ex); - } - catch(const Ice::Exception&) - { - if(__exception) - { - __exception(current_exception()); - } - } + _sent(sentSynchronously); } -function<void ()> -OnewayClosureCallback::invoke( - const string& __name, - const shared_ptr<Ice::ObjectPrx>& __proxy, - Ice::OperationMode __mode, - Ice::FormatType __format, - function<void (OutputStream*)> __marshal, - function<void ()> __response, - function<void (exception_ptr)> __exception, - function<void (bool)> __sent, - const Ice::Context& __context) +void +LambdaInvoke::handleInvokeException(const Ice::Exception& ex, OutgoingAsyncBase*) const { - auto __result = make_shared<OutgoingAsync>(__proxy, __name, - make_shared<OnewayClosureCallback>(__name, __proxy, move(__response), move(__exception), move(__sent))); try { - __result->prepare(__name, __mode, __context); - if(__marshal) - { - __marshal(__result->startWriteParams(__format)); - __result->endWriteParams(); - } - else - { - __result->writeEmptyParams(); - } - __result->invoke(); - } - catch(const exception_ptr& ex) - { - rethrow_exception(ex); + ex.ice_throw(); } - catch(const Exception& __ex) + catch(const Ice::Exception&) { - __result->abort(__ex); + _exception(current_exception()); } - - return [__result]() - { - __result->cancel(); - }; } -TwowayClosureCallback::TwowayClosureCallback( - const string& name, - const shared_ptr<Ice::ObjectPrx>& proxy, - bool readEmptyParams, - function<void (InputStream*)> read, - function<void (const UserException&)> userException, - function<void (exception_ptr)> exception, - function<void (bool)> sent) : - __name(name), - __proxy(proxy), - __readEmptyParams(readEmptyParams), - __read(move(read)), - __userException(move(userException)), - __exception(move(exception)), - __sent(move(sent)) +void +LambdaInvoke::handleInvokeResponse(bool ok, OutgoingAsyncBase*) const { + _response(ok); } -void -TwowayClosureCallback::sent(const AsyncResultPtr& result) const +#else // C++98 + +namespace +{ + +// +// Dummy class derived from CallbackBase +// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, +// this allows us to test whether the user supplied a null delegate instance to the +// generated begin_ method without having to generate a separate test to throw IllegalArgumentException +// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated +// object code. +// +class DummyCallback : public CallbackBase { - if(__sent != nullptr) +public: + + DummyCallback() { - __sent(result->sentSynchronously()); } -} -bool -TwowayClosureCallback::hasSentCallback() const -{ - return __sent != nullptr; -} + virtual void + completed(const Ice::AsyncResultPtr&) const + { + assert(false); + } -void -TwowayClosureCallback::completed(const AsyncResultPtr& __result) const -{ - try + virtual CallbackBasePtr + verify(const Ice::LocalObjectPtr&) { - AsyncResult::__check(__result, __proxy.get(), __name); - if(!__result->__wait()) - { - try - { - __result->__throwUserException(); - } - catch(const Ice::UserException& __ex) - { - if(__userException) - { - __userException(__ex); - } - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - else - { - if(__readEmptyParams) - { - __result->__readEmptyParams(); - __read(0); - } - else - { - __read(__result->__startReadParams()); - } - } + // + // Called by the AsyncResult constructor to verify the delegate. The dummy + // delegate is passed when the user used a begin_ method without delegate. + // By returning 0 here, we tell the AsyncResult that no delegates was + // provided. + // + return 0; } - catch(const exception_ptr& ex) + + virtual void + sent(const AsyncResultPtr&) const { - rethrow_exception(ex); + assert(false); } - catch(const Ice::Exception&) + + virtual bool + hasSentCallback() const { - if(__exception) - { - __exception(current_exception()); - } + assert(false); + return false; } +}; + } -function<void ()> -TwowayClosureCallback::invoke( - const string& __name, - const shared_ptr<Ice::ObjectPrx>& __proxy, - OperationMode __mode, - FormatType __format, - function<void (OutputStream*)> __write, - bool __readEmptyParams, - function<void (InputStream*)> __read, - function<void (const UserException&)> __userException, - function<void (exception_ptr)> __exception, - function<void (bool)> __sent, - const Context& __context) -{ - assert(__proxy); - auto __result = make_shared<OutgoingAsync>(__proxy, __name, - make_shared<TwowayClosureCallback>(__name, __proxy, __readEmptyParams, move(__read), - move(__userException), move(__exception), move(__sent))); - __proxy->__checkAsyncTwowayOnly(__name); - try +// +// This gives a pointer value to compare against in the generated +// begin_ method to decide whether the caller passed a null pointer +// versus the generated inline version of the begin_ method having +// passed a pointer to the dummy delegate. +// +CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; + +void +CallbackBase::checkCallback(bool obj, bool cb) +{ + if(!obj) { - __result->prepare(__name, __mode, __context); - if(__write) - { - __write(__result->startWriteParams(__format)); - __result->endWriteParams(); - } - else - { - __result->writeEmptyParams(); - } - __result->invoke(); + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); } - catch(const Exception& __ex) + if(!cb) { - __result->abort(__ex); + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); } - - return [__result]() - { - __result->cancel(); - }; } + #endif |