diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 1080 |
1 files changed, 886 insertions, 194 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index e7cc8c65540..d7ffc2f4d6d 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -17,91 +17,566 @@ #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> #include <Ice/RetryQueue.h> +#include <Ice/ConnectionFactory.h> +#include <Ice/ObjectAdapterFactory.h> +#include <Ice/LoggerUtil.h> using namespace std; using namespace Ice; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } +#endif + +const unsigned char OutgoingAsyncBase::OK = 0x1; +const unsigned char OutgoingAsyncBase::Sent = 0x2; +#ifndef ICE_CPP11_MAPPING +const unsigned char OutgoingAsyncBase::Done = 0x4; +const unsigned char OutgoingAsyncBase::EndCalled = 0x8; +#endif + +OutgoingAsyncCompletionCallback::~OutgoingAsyncCompletionCallback() +{ + // Out of line to avoid weak vtable +} bool OutgoingAsyncBase::sent() { - return sent(true); + return sentImpl(true); } bool -OutgoingAsyncBase::completed(const Exception& ex) +OutgoingAsyncBase::exception(const Exception& ex) { - return finished(ex); + return exceptionImpl(ex); } bool -OutgoingAsyncBase::completed() +OutgoingAsyncBase::response() { assert(false); // Must be overriden by request that can handle responses return false; } -BasicStream* -OutgoingAsyncBase::getIs() +void +OutgoingAsyncBase::invokeSentAsync() { - return 0; // Must be overriden by request that can handle responses + class AsynchronousSent : public DispatchWorkItem + { + public: + + AsynchronousSent(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(connection), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeSent(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, ICE_SHARED_FROM_THIS)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + } } -OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie), - _os(instance.get(), Ice::currentProtocolEncoding) +void +OutgoingAsyncBase::invokeExceptionAsync() +{ + class AsynchronousException : public DispatchWorkItem + { + public: + + AsynchronousException(const ConnectionPtr& c, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(c), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeException(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, ICE_SHARED_FROM_THIS)); +} + +void +OutgoingAsyncBase::invokeResponseAsync() +{ + class AsynchronousResponse : public DispatchWorkItem + { + public: + + AsynchronousResponse(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) : + DispatchWorkItem(connection), _outAsync(outAsync) + { + } + + virtual void + run() + { + _outAsync->invokeResponse(); + } + + private: + + const OutgoingAsyncBasePtr _outAsync; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousResponse(_cachedConnection, ICE_SHARED_FROM_THIS)); +} + +void +OutgoingAsyncBase::invokeSent() +{ + try + { + handleInvokeSent(_sentSynchronously, this); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + if(_observer && _doneInSent) + { + _observer.detach(); + } +} + +void +OutgoingAsyncBase::invokeException() +{ + try + { + handleInvokeException(*_ex, this); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +OutgoingAsyncBase::invokeResponse() +{ + if(_ex) + { + invokeException(); + return; + } + + try + { +#ifdef ICE_CPP11_MAPPING + try + { + handleInvokeResponse(_state & OK, this); + } + catch(const Ice::Exception& ex) + { + if(handleException(ex)) + { + handleInvokeException(ex, this); + } + } + catch(const exception_ptr& ex) + { + rethrow_exception(ex); + } +#else + handleInvokeResponse(_state & OK, this); +#endif + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +OutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + Lock sync(_m); + if(_cancellationException) + { + try + { + _cancellationException->ice_throw(); + } + catch(const Ice::LocalException&) + { + _cancellationException.reset(); + throw; + } + } + _cancellationHandler = handler; +} + +void +OutgoingAsyncBase::cancel() +{ + cancel(Ice::InvocationCanceledException(__FILE__, __LINE__)); +} + +OutgoingAsyncBase::OutgoingAsyncBase(const InstancePtr& instance) : + _instance(instance), + _sentSynchronously(false), + _doneInSent(false), + _state(0), + _os(instance.get(), Ice::currentProtocolEncoding), + _is(instance.get(), Ice::currentProtocolEncoding) { } bool -OutgoingAsyncBase::sent(bool done) +OutgoingAsyncBase::sentImpl(bool done) { + Lock sync(_m); + bool alreadySent = (_state & Sent) > 0; + _state |= Sent; if(done) { + _doneInSent = true; _childObserver.detach(); + _cancellationHandler = 0; + } + +#ifndef ICE_CPP11_MAPPING + if(done) + { + _state |= Done | OK; } - return AsyncResult::sent(done); + _m.notifyAll(); +#endif + + bool invoke = handleSent(done, alreadySent); + if(!invoke && _doneInSent) + { + _observer.detach(); + } + return invoke; } bool -OutgoingAsyncBase::finished(const Exception& ex) +OutgoingAsyncBase::exceptionImpl(const Exception& ex) { + Lock sync(_m); + ICE_SET_EXCEPTION_FROM_CLONE(_ex, ex.ice_clone()); if(_childObserver) { - _childObserver.failed(ex.ice_name()); + _childObserver.failed(ex.ice_id()); _childObserver.detach(); } - return AsyncResult::finished(ex); + _cancellationHandler = 0; + _observer.failed(ex.ice_id()); + +#ifndef ICE_CPP11_MAPPING + _state |= Done; + _m.notifyAll(); +#endif + + bool invoke = handleException(ex); + if(!invoke) + { + _observer.detach(); + } + return invoke; } -Ice::ObjectPrx -ProxyOutgoingAsyncBase::getProxy() const +bool +OutgoingAsyncBase::responseImpl(bool ok) { - return _proxy; + Lock sync(_m); + if(ok) + { + _state |= OK; + } + + _cancellationHandler = 0; + +#ifndef ICE_CPP11_MAPPING + _state |= Done; + _m.notifyAll(); +#endif + + bool invoke; + try + { + invoke = handleResponse(ok); + } + catch(const Ice::Exception& ex) + { + ICE_SET_EXCEPTION_FROM_CLONE(_ex, ex.ice_clone()); + invoke = handleException(ex); + } + if(!invoke) + { + _observer.detach(); + } + return invoke; +} + +void +OutgoingAsyncBase::cancel(const Ice::LocalException& ex) +{ + CancellationHandlerPtr handler; + { + Lock sync(_m); + ICE_SET_EXCEPTION_FROM_CLONE(_cancellationException, ex.ice_clone()); + if(!_cancellationHandler) + { + return; + } + handler = _cancellationHandler; + } + handler->asyncRequestCanceled(ICE_SHARED_FROM_THIS, ex); +} + +#ifndef ICE_CPP11_MAPPING + +Int +OutgoingAsyncBase::getHash() const +{ + return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); +} + +CommunicatorPtr +OutgoingAsyncBase::getCommunicator() const +{ + return 0; +} + +ConnectionPtr +OutgoingAsyncBase::getConnection() const +{ + return 0; +} + +ObjectPrxPtr +OutgoingAsyncBase::getProxy() const +{ + return 0; +} + +Ice::LocalObjectPtr +OutgoingAsyncBase::getCookie() const +{ + return _cookie; +} + +const std::string& +OutgoingAsyncBase::getOperation() const +{ + assert(false); // Must be overriden + static string empty; + return empty; +} + +bool +OutgoingAsyncBase::isCompleted() const +{ + Lock sync(_m); + return (_state & Done) > 0; +} + +void +OutgoingAsyncBase::waitForCompleted() +{ + Lock sync(_m); + while(!(_state & Done)) + { + _m.wait(); + } +} + +bool +OutgoingAsyncBase::isSent() const +{ + Lock sync(_m); + return (_state & Sent) > 0; +} + +void +OutgoingAsyncBase::waitForSent() +{ + Lock sync(_m); + while(!(_state & Sent) && !_ex.get()) + { + _m.wait(); + } } bool -ProxyOutgoingAsyncBase::completed(const Exception& exc) +OutgoingAsyncBase::sentSynchronously() const +{ + return _sentSynchronously; +} + +void +OutgoingAsyncBase::throwLocalException() const +{ + Lock sync(_m); + if(_ex.get()) + { + _ex->ice_throw(); + } +} + +bool +OutgoingAsyncBase::__wait() +{ + Lock sync(_m); + if(_state & EndCalled) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); + } + _state |= EndCalled; + while(!(_state & Done)) + { + _m.wait(); + } + + if(_ex.get()) + { + _ex->ice_throw(); + } + return _state & OK; +} + +Ice::InputStream* +OutgoingAsyncBase::__startReadParams() +{ + _is.startEncapsulation(); + return &_is; +} + +void +OutgoingAsyncBase::__endReadParams() +{ + _is.endEncapsulation(); +} + +void +OutgoingAsyncBase::__readEmptyParams() +{ + _is.skipEmptyEncapsulation(); +} + +void +OutgoingAsyncBase::__readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) +{ + _is.readEncapsulation(encaps, sz); +} + +void +OutgoingAsyncBase::__throwUserException() +{ + try + { + _is.startEncapsulation(); + _is.throwException(); + } + catch(const Ice::UserException&) + { + _is.endEncapsulation(); + throw; + } +} + +#endif + +void +OutgoingAsyncBase::warning(const std::exception& exc) const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Ice::Warning out(_instance->initializationData().logger); + const Ice::Exception* ex = dynamic_cast<const Ice::Exception*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << *ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } +} + +void +OutgoingAsyncBase::warning() const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Ice::Warning out(_instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } +} + +bool +ProxyOutgoingAsyncBase::exception(const Exception& exc) { if(_childObserver) { - _childObserver.failed(exc.ice_name()); + _childObserver.failed(exc.ice_id()); _childObserver.detach(); } _cachedConnection = 0; if(_proxy->__reference()->getInvocationTimeout() == -2) { - _instance->timer()->cancel(this); + _instance->timer()->cancel(ICE_SHARED_FROM_THIS); } // @@ -115,16 +590,30 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) // the retry interval is 0. This method can be called with the // connection locked so we can't just retry here. // - _instance->retryQueue()->add(this, handleException(exc)); + _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, _proxy->__handleException(exc, _handler, _mode, _sent, _cnt)); return false; } catch(const Exception& ex) { - return finished(ex); // No retries, we're done + return exceptionImpl(ex); // No retries, we're done } } void +ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) + { + const int timeout = _cachedConnection->timeout(); + if(timeout > 0) + { + _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::milliSeconds(timeout)); + } + } + OutgoingAsyncBase::cancelable(handler); +} + +void ProxyOutgoingAsyncBase::retryException(const Exception& ex) { try @@ -136,32 +625,18 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex) // connection to be done. // _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry. - _instance->retryQueue()->add(this, 0); + _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, 0); } catch(const Ice::Exception& exc) { - if(completed(exc)) + if(exception(exc)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } void -ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) -{ - if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) - { - const int timeout = _cachedConnection->timeout(); - if(timeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout)); - } - } - AsyncResult::cancelable(handler); -} - -void ProxyOutgoingAsyncBase::retry() { invokeImpl(false); @@ -172,9 +647,9 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) { assert(!_childObserver); - if(finished(ex)) + if(exceptionImpl(ex)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) { @@ -187,18 +662,33 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) } } -ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), +#ifndef ICE_CPP11_MAPPING +Ice::ObjectPrx +ProxyOutgoingAsyncBase::getProxy() const +{ + return _proxy; +} + +Ice::CommunicatorPtr +ProxyOutgoingAsyncBase::getCommunicator() const +{ + return _proxy->ice_getCommunicator(); +} +#endif + +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx) : + OutgoingAsyncBase(prx->__reference()->getInstance()), _proxy(prx), - _mode(Normal), + _mode(ICE_ENUM(OperationMode, Normal)), _cnt(0), _sent(false) { } +ProxyOutgoingAsyncBase::~ProxyOutgoingAsyncBase() +{ +} + void ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { @@ -209,7 +699,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); if(invocationTimeout > 0) { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::milliSeconds(invocationTimeout)); } } else @@ -223,7 +713,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { _sent = false; _handler = _proxy->__getRequestHandler(); - AsyncStatus status = _handler->sendAsyncRequest(this); + AsyncStatus status = _handler->sendAsyncRequest(ICE_SHARED_FROM_THIS); if(status & AsyncStatusSent) { if(userThread) @@ -252,13 +742,13 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { if(_childObserver) { - _childObserver.failed(ex.ice_name()); + _childObserver.failed(ex.ice_id()); _childObserver.detach(); } - int interval = handleException(ex); + int interval = _proxy->__handleException(ex, _handler, _mode, _sent, _cnt); if(interval > 0) { - _instance->retryQueue()->add(this, interval); + _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, interval); return; } else @@ -278,51 +768,45 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { throw; } - else if(finished(ex)) // No retries, we're done + else if(exceptionImpl(ex)) // No retries, we're done { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } bool -ProxyOutgoingAsyncBase::sent(bool done) +ProxyOutgoingAsyncBase::sentImpl(bool done) { _sent = true; if(done) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); + _instance->timer()->cancel(ICE_SHARED_FROM_THIS); } } - return OutgoingAsyncBase::sent(done); + return OutgoingAsyncBase::sentImpl(done); } bool -ProxyOutgoingAsyncBase::finished(const Exception& ex) +ProxyOutgoingAsyncBase::exceptionImpl(const Exception& ex) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); + _instance->timer()->cancel(ICE_SHARED_FROM_THIS); } - return OutgoingAsyncBase::finished(ex); + return OutgoingAsyncBase::exceptionImpl(ex); } bool -ProxyOutgoingAsyncBase::finished(bool ok) +ProxyOutgoingAsyncBase::responseImpl(bool ok) { if(_proxy->__reference()->getInvocationTimeout() != -1) { - _instance->timer()->cancel(this); + _instance->timer()->cancel(ICE_SHARED_FROM_THIS); } - return AsyncResult::finished(ok); -} - -int -ProxyOutgoingAsyncBase::handleException(const Exception& exc) -{ - return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); + return OutgoingAsyncBase::responseImpl(ok); } void @@ -338,22 +822,20 @@ ProxyOutgoingAsyncBase::runTimerTask() } } -OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), - _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) +OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, bool synchronous) : + ProxyOutgoingAsyncBase(prx), + _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())), + _synchronous(synchronous) { } void -OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context) +OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context& context) { checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); _mode = mode; - _observer.attach(_proxy.get(), operation, context); + _observer.attach(_proxy, operation, context); switch(_proxy->__reference()->getMode()) { @@ -394,12 +876,12 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex _os.write(static_cast<Byte>(_mode)); - if(context != 0) + if(&context != &Ice::noExplicitContext) { // // Explicit context // - _os.write(*context); + _os.write(context); } else { @@ -422,61 +904,11 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex bool OutgoingAsync::sent() { - return ProxyOutgoingAsyncBase::sent(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy -} - -AsyncStatus -OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) -{ - _cachedConnection = connection; - return connection->sendAsyncRequest(this, compress, response, 0); -} - -AsyncStatus -OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) -{ - return handler->invokeAsyncRequest(this, 0); -} - -void -OutgoingAsync::abort(const Exception& ex) -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); - } - - ProxyOutgoingAsyncBase::abort(ex); -} - -void -OutgoingAsync::invoke() -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _sentSynchronously = true; - _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, getOperation()); - finished(true); - return; // Don't call sent/completed callback for batch AMI requests - } - - // - // NOTE: invokeImpl doesn't throw so this can be called from the - // try block with the catch block calling abort() in case of an - // exception. - // - invokeImpl(true); // userThread = true + return ProxyOutgoingAsyncBase::sentImpl(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy } bool -OutgoingAsync::completed() +OutgoingAsync::response() { // // NOTE: this method is called from ConnectionI.parseMessage @@ -612,22 +1044,116 @@ OutgoingAsync::completed() } } - return finished(replyStatus == replyOK); + return responseImpl(replyStatus == replyOK); } catch(const Exception& ex) { - return completed(ex); + return exception(ex); + } +} + +AsyncStatus +OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) +{ + _cachedConnection = connection; + return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, response, 0); +} + +AsyncStatus +OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncRequest(this, 0, _synchronous); +} + +void +OutgoingAsync::abort(const Exception& ex) +{ + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) + { + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); + } + + ProxyOutgoingAsyncBase::abort(ex); +} + +void +OutgoingAsync::invoke(const string& operation) +{ + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) + { + _sentSynchronously = true; + _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, operation); + responseImpl(true); + return; // Don't call sent/completed callback for batch AMI requests + } + + // + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. + // + invokeImpl(true); // userThread = true +} + +#ifdef ICE_CPP11_MAPPING +void +OutgoingAsync::invoke(const string& operation, + Ice::OperationMode mode, + Ice::FormatType format, + const Ice::Context& context, + function<void(Ice::OutputStream*)> write) +{ + try + { + prepare(operation, mode, context); + if(write) + { + _os.startEncapsulation(_encoding, format); + write(&_os); + _os.endEncapsulation(); + } + else + { + _os.writeEmptyEncapsulation(_encoding); + } + invoke(operation); + } + catch(const Ice::Exception& ex) + { + abort(ex); } } -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +void +OutgoingAsync::throwUserException() +{ + try + { + _is.startEncapsulation(); + _is.throwException(); + } + catch(const UserException& ex) + { + _is.endEncapsulation(); + if(_userException) + { + _userException(ex); + } + throw UnknownUserException(__FILE__, __LINE__, ex.ice_id()); + } +} + +#endif + +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy) { - _observer.attach(proxy.get(), operation, 0); - _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); } AsyncStatus @@ -645,7 +1171,7 @@ ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compre } } _cachedConnection = connection; - return connection->sendAsyncRequest(this, compress, false, _batchRequestNum); + return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum); } AsyncStatus @@ -662,32 +1188,29 @@ ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) return AsyncStatusSent; } } - return handler->invokeAsyncRequest(this, _batchRequestNum); + return handler->invokeAsyncRequest(this, _batchRequestNum, false); } void -ProxyFlushBatchAsync::invoke() +ProxyFlushBatchAsync::invoke(const string& operation) { checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); + _batchRequestNum = _proxy->__getBatchRequestQueue()->swap(&_os); invokeImpl(true); // userThread = true } -ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) +ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx) { - _observer.attach(prx.get(), operation, 0); } AsyncStatus ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; - if(finished(true)) + if(responseImpl(true)) { - invokeCompletedAsync(); + invokeResponseAsync(); } return AsyncStatusSent; } @@ -695,28 +1218,29 @@ ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) AsyncStatus ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) { - if(finished(true)) + if(responseImpl(true)) { - invokeCompletedAsync(); + invokeResponseAsync(); } return AsyncStatusSent; } +Ice::ConnectionPtr +ProxyGetConnection::getConnection() const +{ + return _cachedConnection; +} + void -ProxyGetConnection::invoke() +ProxyGetConnection::invoke(const string& operation) { + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); invokeImpl(true); // userThread = true } -ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : - OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : + OutgoingAsyncBase(instance), _connection(connection) { - _observer.attach(instance.get(), operation); } ConnectionPtr @@ -726,8 +1250,9 @@ ConnectionFlushBatchAsync::getConnection() const } void -ConnectionFlushBatchAsync::invoke() +ConnectionFlushBatchAsync::invoke(const string& operation) { + _observer.attach(_instance.get(), operation); try { AsyncStatus status; @@ -742,7 +1267,7 @@ ConnectionFlushBatchAsync::invoke() } else { - status = _connection->sendAsyncRequest(this, false, false, batchRequestNum); + status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum); } if(status & AsyncStatusSent) @@ -756,29 +1281,28 @@ ConnectionFlushBatchAsync::invoke() } catch(const RetryException& ex) { - if(completed(*ex.get())) + if(exception(*ex.get())) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } catch(const Exception& ex) { - if(completed(ex)) + if(exception(ex)) { - invokeCompletedAsync(); + invokeExceptionAsync(); } } } -CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& cb, - const LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, cb, cookie) +CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync() { - _observer.attach(instance.get(), operation); + // Out of line to avoid weak vtable +} +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : + OutgoingAsyncBase(instance) +{ // // _useCount is initialized to 1 to prevent premature callbacks. // The caller must invoke ready() after all flush requests have @@ -797,46 +1321,77 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, const InstancePtr& instance, InvocationObserver& observer) : - OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), - _outAsync(outAsync), - _observer(observer) + OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) { } - virtual bool sent() + virtual bool + sent() { _childObserver.detach(); _outAsync->check(false); return false; } - virtual bool completed(const Exception& ex) + virtual bool + exception(const Exception& ex) { - _childObserver.failed(ex.ice_name()); + _childObserver.failed(ex.ice_id()); _childObserver.detach(); _outAsync->check(false); return false; } - private: - - virtual InvocationObserver& getObserver() + virtual InvocationObserver& + getObserver() { return _observer; } + virtual bool handleSent(bool, bool) + { + return false; + } + + virtual bool handleException(const Ice::Exception&) + { + return false; + } + + virtual bool handleResponse(bool) + { + return false; + } + + virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + private: + const CommunicatorFlushBatchAsyncPtr _outAsync; InvocationObserver& _observer; }; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + Lock sync(_m); ++_useCount; } try { - OutgoingAsyncBasePtr flushBatch = new FlushBatch(this, _instance, _observer); + OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer); int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); if(batchRequestNum == 0) { @@ -855,8 +1410,11 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) } void -CommunicatorFlushBatchAsync::ready() +CommunicatorFlushBatchAsync::invoke(const string& operation) { + _observer.attach(_instance.get(), operation); + _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); + _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); check(true); } @@ -864,7 +1422,7 @@ void CommunicatorFlushBatchAsync::check(bool userThread) { { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + Lock sync(_m); assert(_useCount > 0); if(--_useCount > 0) { @@ -872,7 +1430,7 @@ CommunicatorFlushBatchAsync::check(bool userThread) } } - if(sent(true)) + if(sentImpl(true)) { if(userThread) { @@ -885,3 +1443,137 @@ CommunicatorFlushBatchAsync::check(bool userThread) } } } + +#ifdef ICE_CPP11_MAPPING + +bool +LambdaInvoke::handleSent(bool, bool alreadySent) +{ + return _sent != nullptr && !alreadySent; // Invoke the sent callback only if not already invoked. +} + +bool +LambdaInvoke::handleException(const Ice::Exception&) +{ + return _exception != nullptr; // Invoke the callback +} + +bool +LambdaInvoke::handleResponse(bool) +{ + return _response != nullptr; +} + +void +LambdaInvoke::handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase*) const +{ + _sent(sentSynchronously); +} + +void +LambdaInvoke::handleInvokeException(const Ice::Exception& ex, OutgoingAsyncBase*) const +{ + try + { + ex.ice_throw(); + } + catch(const Ice::Exception&) + { + _exception(current_exception()); + } +} + +void +LambdaInvoke::handleInvokeResponse(bool ok, OutgoingAsyncBase*) const +{ + _response(ok); +} + +#else // C++98 + +namespace +{ + +// +// Dummy class derived from CallbackBase +// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, +// this allows us to test whether the user supplied a null delegate instance to the +// generated begin_ method without having to generate a separate test to throw IllegalArgumentException +// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated +// object code. +// +class DummyCallback : public CallbackBase +{ +public: + + DummyCallback() + { + } + + virtual void + completed(const Ice::AsyncResultPtr&) const + { + assert(false); + } + + virtual CallbackBasePtr + verify(const Ice::LocalObjectPtr&) + { + // + // Called by the AsyncResult constructor to verify the delegate. The dummy + // delegate is passed when the user used a begin_ method without delegate. + // By returning 0 here, we tell the AsyncResult that no delegates was + // provided. + // + return 0; + } + + virtual void + sent(const AsyncResultPtr&) const + { + assert(false); + } + + virtual bool + hasSentCallback() const + { + assert(false); + return false; + } +}; + +} + +// +// This gives a pointer value to compare against in the generated +// begin_ method to decide whether the caller passed a null pointer +// versus the generated inline version of the begin_ method having +// passed a pointer to the dummy delegate. +// +CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; + +CallbackBase::~CallbackBase() +{ + // Out of line to avoid weak vtable +} + +void +CallbackBase::checkCallback(bool obj, bool cb) +{ + if(!obj) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); + } + if(!cb) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); + } +} + +GenericCallbackBase::~GenericCallbackBase() +{ + // Out of line to avoid weak vtable +} + + +#endif |