diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 1268 |
1 files changed, 320 insertions, 948 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index fb57965b60f..b348b1c8cc8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -9,18 +9,11 @@ #include <IceUtil/DisableWarnings.h> #include <Ice/OutgoingAsync.h> -#include <Ice/Object.h> #include <Ice/ConnectionI.h> #include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/LocalException.h> -#include <Ice/Properties.h> -#include <Ice/LoggerUtil.h> -#include <Ice/LocatorInfo.h> -#include <Ice/ProxyFactory.h> -#include <Ice/RouterInfo.h> -#include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> @@ -30,431 +23,289 @@ using namespace std; using namespace Ice; using namespace IceInternal; -IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; } - -IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; } +IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } +IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } - -const unsigned char Ice::AsyncResult::OK = 0x1; -const unsigned char Ice::AsyncResult::Done = 0x2; -const unsigned char Ice::AsyncResult::Sent = 0x4; -const unsigned char Ice::AsyncResult::EndCalled = 0x8; - -namespace -{ - -class AsynchronousException : public DispatchWorkItem -{ -public: - - AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, - const Ice::Exception& ex) : - DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone()) - { - } - - virtual void - run() - { - _result->__invokeException(*_exception.get()); - } - -private: - - const Ice::AsyncResultPtr _result; - const IceUtil::UniquePtr<Ice::Exception> _exception; -}; - -class AsynchronousSent : public DispatchWorkItem -{ -public: - - AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) : - DispatchWorkItem(connection), _result(result) - { - } - - virtual void - run() - { - _result->__invokeSent(); - } +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; } -private: - - const Ice::AsyncResultPtr _result; -}; - -}; - -Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator, - const IceInternal::InstancePtr& instance, - const string& op, - const CallbackBasePtr& del, - const LocalObjectPtr& cookie) : - _communicator(communicator), - _instance(instance), - _operation(op), - _callback(del), - _cookie(cookie), - _is(instance.get(), Ice::currentProtocolEncoding), - _os(instance.get(), Ice::currentProtocolEncoding), - _state(0), - _sentSynchronously(false), - _exception(0) +bool +OutgoingAsyncBase::sent() { - if(!_callback) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); - } - const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie); + return sent(true); } -Ice::AsyncResult::~AsyncResult() +bool +OutgoingAsyncBase::completed(const Exception& ex) { + return finished(ex); } -Int -Ice::AsyncResult::getHash() const +OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, delegate, cookie), + _os(instance.get(), Ice::currentProtocolEncoding) { - return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); } bool -Ice::AsyncResult::isCompleted() const +OutgoingAsyncBase::sent(bool done) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Done; -} - -void -Ice::AsyncResult::waitForCompleted() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Done)) + if(done) { - _monitor.wait(); + _childObserver.detach(); } + return AsyncResult::sent(done); } bool -Ice::AsyncResult::isSent() const -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Sent; -} - -void -Ice::AsyncResult::waitForSent() +OutgoingAsyncBase::finished(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Sent) && !_exception.get()) + if(_childObserver) { - _monitor.wait(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); } + return AsyncResult::finished(ex); } -void -Ice::AsyncResult::throwLocalException() const +Ice::ObjectPrx +ProxyOutgoingAsyncBase::getProxy() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_exception.get()) - { - _exception.get()->ice_throw(); - } + return _proxy; } bool -Ice::AsyncResult::__wait() +ProxyOutgoingAsyncBase::sent() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_state & EndCalled) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); - } - _state |= EndCalled; - while(!(_state & Done)) - { - _monitor.wait(); - } - if(_exception.get()) - { - _exception.get()->ice_throw(); - } - return _state & OK; + return sent(!_proxy->ice_isTwoway()); // Done if it's not a two-way proxy (no response expected). } -void -Ice::AsyncResult::__throwUserException() +bool +ProxyOutgoingAsyncBase::completed(const Exception& exc) { - try - { - _is.startReadEncaps(); - _is.throwException(); - } - catch(const Ice::UserException&) + if(_childObserver) { - _is.endReadEncaps(); - throw; + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); } -} -void -Ice::AsyncResult::__invokeSent() -{ // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. // - - if(_callback) + try { - try - { - AsyncResultPtr self(this); - _callback->sent(self); - } - catch(const std::exception& ex) - { - __warning(ex); - } - catch(...) - { - __warning(); - } + _instance->retryQueue()->add(this, handleException(exc)); + return false; } - - if(_observer) + catch(const Exception& ex) { - Ice::ObjectPrx proxy = getProxy(); - if(!proxy || !proxy->ice_isTwoway()) - { - _observer.detach(); - } + return finished(ex); // No retries, we're done } } void -Ice::AsyncResult::__invokeSentAsync() +ProxyOutgoingAsyncBase::retry() { - // - // This is called when it's not safe to call the sent callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); - } - catch(const Ice::CommunicatorDestroyedException&) - { - } + invokeImpl(false); } void -Ice::AsyncResult::__invokeException(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) { + assert(!_childObserver); + + if(finished(ex)) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); + invokeCompletedAsync(); + } + else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) + { + // + // If it's a communicator destroyed exception, don't swallow + // it but instead notify the user thread. Even if no callback + // was provided. + // + ex.ice_throw(); } - - __invokeCompleted(); } -void -Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), + _proxy(prx), + _mode(Normal), + _cnt(0), + _sent(false) { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - // CommunicatorDestroyedException is the only exception that can propagate directly - // from this method. - // - _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex)); } void -Ice::AsyncResult::__invokeCompleted() +ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback) + try { - try + if(userThread) { - AsyncResultPtr self(this); - _callback->completed(self); + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + } } - catch(const std::exception& ex) + else { - __warning(ex); + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); } - catch(...) + + while(true) { - __warning(); + try + { + _sent = false; + _handler = _proxy->__getRequestHandler(); + AsyncStatus status = _handler->sendAsyncRequest(this); + if(status & AsyncStatusSent) + { + if(userThread) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); // Call the sent callback from the user thread. + } + } + else + { + if(status & AsyncStatusInvokeSentCallback) + { + invokeSentAsync(); // Call the sent callback from a client thread pool thread. + } + } + } + return; // We're done! + } + catch(const RetryException& ex) + { + handleRetryException(ex); + } + catch(const Exception& ex) + { + if(_childObserver) + { + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + } + int interval = handleException(ex); + if(interval > 0) + { + _instance->retryQueue()->add(this, interval); + return; + } + else + { + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); + } + } } } - - _observer.detach(); -} - -void -Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() -{ - RequestHandlerPtr handler; + catch(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - swap(handler, _timeoutRequestHandler); - } - - if(handler) - { - handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this)); + // + // If called from the user thread we re-throw, the exception + // will be catch by the caller and abort() will be called. + // + if(userThread) + { + throw; + } + else if(finished(ex)) // No retries, we're done + { + invokeCompletedAsync(); + } } } - -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +bool +ProxyOutgoingAsyncBase::sent(bool done) { - __check(r, operation); - if(r->getProxy().get() != prx) + _sent = true; + if(done) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); + if(_proxy->__reference()->getInvocationTimeout() > 0) + { + _instance->timer()->cancel(this); + } } + return OutgoingAsyncBase::sent(done); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(const Exception& ex) { - __check(r, operation); - if(r->getCommunicator().get() != com) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + - " does not match communicator that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return OutgoingAsyncBase::finished(ex); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(bool ok) { - __check(r, operation); - if(r->getConnection().get() != con) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + - " does not match connection that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return AsyncResult::finished(ok); } void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation) +ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc) { - if(!r) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); - } - else if(&r->_operation != &operation) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + - " method: " + r->_operation); - } + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry. } - -void -Ice::AsyncResult::__warning(const std::exception& exc) const +int +ProxyOutgoingAsyncBase::handleException(const Exception& exc) { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - Warning out(_instance->initializationData().logger); - const Exception* ex = dynamic_cast<const Exception*>(&exc); - if(ex) - { - out << "Ice::Exception raised by AMI callback:\n" << *ex; - } - else - { - out << "std::exception raised by AMI callback:\n" << exc.what(); - } - } + return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); } void -Ice::AsyncResult::__warning() const +ProxyOutgoingAsyncBase::runTimerTask() { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + try { - Warning out(_instance->initializationData().logger); - out << "unknown exception raised by AMI callback"; + cancel(InvocationTimeoutException(__FILE__, __LINE__)); } -} - -void -IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool, - const Ice::ConnectionPtr& connection) -{ - class InvocationTimeoutCall : public DispatchWorkItem + catch(const CommunicatorDestroyedException&) { - public: - - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : - DispatchWorkItem(connection), _outAsync(outAsync) - { - } - - virtual void - run() - { - InvocationTimeoutException ex(__FILE__, __LINE__); - _outAsync->__finished(ex); - } - - private: - - const OutgoingAsyncMessageCallbackPtr _outAsync; - }; - threadPool->dispatch(new InvocationTimeoutCall(this, connection)); + } } -IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), +OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) { } void -IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context) +OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context) { - _handler = 0; - _cnt = 0; - _sent = false; - _mode = mode; - _sentSynchronously = false; - checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + _mode = mode; _observer.attach(_proxy.get(), operation, context); switch(_proxy->__reference()->getMode()) @@ -482,7 +333,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. } - catch(const Ice::LocalException& ex) + catch(const LocalException& ex) { _observer.failed(ex.ice_name()); _proxy->__setRequestHandler(_handler, 0); // Clear request handler @@ -541,109 +392,63 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod } AsyncStatus -IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response) { _cachedConnection = connection; return connection->sendAsyncRequest(this, compress, response); } AsyncStatus -IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) { return handler->invokeAsyncRequest(this); } -bool -IceInternal::OutgoingAsync::__sent() +void +OutgoingAsync::abort(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - - bool alreadySent = _state & Sent; // Expected in case of a retry. - _state |= Sent; - _sent = true; - - assert(!(_state & Done)); - if(!_proxy->ice_isTwoway()) + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - _childObserver.detach(); - if(!_callback || !_callback->hasSentCallback()) + if(_handler) { - _observer.detach(); - } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _handler->abortBatchRequest(); } - _state |= Done | OK; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. } - _monitor.notifyAll(); - return !alreadySent && _callback && _callback->hasSentCallback(); -} - -void -IceInternal::OutgoingAsync::__invokeSent() -{ - ::Ice::AsyncResult::__invokeSent(); + + ProxyOutgoingAsyncBase::abort(ex); } void -IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) +OutgoingAsync::invoke() { + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!(_state & Done)); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) + if(_handler) { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + _sentSynchronously = true; + _handler->finishBatchRequest(&_os); + finished(true); } + return; // Don't call sent/completed callback for batch AMI requests } // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. // - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex) -{ - if((_state & Done) == 0 && _handler) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _handler->abortBatchRequest(); - } - } - AsyncResult::__invokeExceptionAsync(ex); -} - -void -IceInternal::OutgoingAsync::__processRetry() -{ - __invoke(false); + invokeImpl(true); // userThread = true } bool -IceInternal::OutgoingAsync::__finished() +OutgoingAsync::completed() { // // NOTE: this method is called from ConnectionI.parseMessage @@ -652,25 +457,15 @@ IceInternal::OutgoingAsync::__finished() // assert(_proxy->ice_isTwoway()); // Can only be called for twoways. - Ice::Byte replyStatus; - try + if(_childObserver) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get() && !(_state & Done)); - assert(!_is.b.empty()); - - if(_childObserver) - { - _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); - } + _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); _childObserver.detach(); + } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - + Byte replyStatus; + try + { _is.read(replyStatus); switch(replyStatus) @@ -789,373 +584,197 @@ IceInternal::OutgoingAsync::__finished() } } - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - if(replyStatus == replyOK) - { - _state |= OK; - } - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; + return finished(replyStatus == replyOK); } - catch(const LocalException& exc) + catch(const Exception& ex) { - // - // We don't call finished(exc) here because we don't want - // to invoke the completion callback. The completion - // callback is invoked by the connection is this method - // returns true. - // - try - { - handleException(exc); - return false; // Invocation will be retried. - } - catch(const Ice::Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; - } + return completed(ex); } } +ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + bool -IceInternal::OutgoingAsync::__invoke(bool userThread) +ProxyFlushBatch::sent() { - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state |= Done | OK; - _handler->finishBatchRequest(&_os); - _observer.detach(); - return true; - } + return ProxyOutgoingAsyncBase::sent(true); // Overriden because the flush is done even if using a two-way proxy. +} - while(true) - { - try - { - _sent = false; - _handler = _proxy->__getRequestHandler(); - AsyncStatus status = _handler->sendAsyncRequest(this); - if(status & AsyncStatusSent) - { - if(userThread) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSent(); // Call the sent callback from the user thread. - } - } - else - { - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSentAsync(); // Call the sent callback from a client thread pool thread. - } - } - } +AsyncStatus +ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool) +{ + _cachedConnection = connection; + return connection->flushAsyncBatchRequests(this); +} - if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent)) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = _handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = _handler; - } - } - } - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. - continue; - } - catch(const Ice::Exception& ex) - { - handleException(ex); - } - break; - } - return _sentSynchronously; +AsyncStatus +ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncBatchRequests(this); } void -IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) +ProxyFlushBatch::invoke() { - try - { - int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); - _observer.retried(); // Invocation is being retried. - - // - // Schedule the retry. Note that we always schedule the retry - // on the retry queue even if the invocation can be retried - // immediately. This is required because it might not be safe - // to retry from this thread (this is for instance called by - // finished(BasicStream) which is called with the connection - // locked. - // - _instance->retryQueue()->add(this, interval); - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } + checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + invokeImpl(true); // userThread = true } -IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) +void +ProxyFlushBatch::handleRetryException(const RetryException& ex) { + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests } -AsyncStatus -IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) +int +ProxyFlushBatch::handleException(const Exception& ex) { - _cachedConnection = connection; - return connection->flushAsyncBatchRequests(this); + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests + return 0; } -AsyncStatus -IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) { - return handler->invokeAsyncBatchRequests(this); + _observer.attach(prx.get(), operation, 0); } -bool -IceInternal::BatchOutgoingAsync::__sent() +AsyncStatus +ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get()); - _state |= Done | OK | Sent; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - _monitor.notifyAll(); - if(!_callback || !_callback->hasSentCallback()) + _cachedConnection = connection; + if(finished(true)) { - _observer.detach(); - return false; + invokeCompletedAsync(); } - return true; + return AsyncStatusSent; } -void -IceInternal::BatchOutgoingAsync::__invokeSent() +AsyncStatus +ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) { - ::Ice::AsyncResult::__invokeSent(); + if(finished(true)) + { + invokeCompletedAsync(); + } + return AsyncStatusSent; } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc) +ProxyGetConnection::invoke() { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - } - __invokeException(exc); + invokeImpl(true); // userThread = true } -void -IceInternal::BatchOutgoingAsync::__processRetry() +ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) { - assert(false); // Retries are never scheduled + _observer.attach(instance.get(), operation); } -IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie), - _proxy(proxy) +ConnectionPtr +ConnectionFlushBatch::getConnection() const { - _observer.attach(proxy.get(), operation, 0); + return _connection; } void -IceInternal::ProxyBatchOutgoingAsync::__invoke() +ConnectionFlushBatch::invoke() { - checkSupportedProtocol(_proxy->__reference()->getProtocol()); - - RequestHandlerPtr handler; try { - handler = _proxy->__getRequestHandler(); - AsyncStatus status = handler->sendAsyncRequest(this); + AsyncStatus status = _connection->flushAsyncBatchRequests(this); if(status & AsyncStatusSent) { _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __invokeSent(); - } - } - else - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = handler; - } + invokeSent(); } } } - catch(const RetryException&) - { - // - // Clear request handler but don't retry or throw. Retrying - // isn't useful, there were no batch requests associated with - // the proxy's request handler. - // - _proxy->__setRequestHandler(handler, 0); - } - catch(const Ice::Exception& ex) + catch(const Exception& ex) { - _observer.failed(ex.ice_name()); - _proxy->__setRequestHandler(handler, 0); // Clear request handler - throw; // Throw to notify the user that batch requests were potentially lost. - } -} - -IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(communicator, instance, operation, delegate, cookie), - _connection(con) -{ - _observer.attach(instance.get(), operation); -} - -void -IceInternal::ConnectionBatchOutgoingAsync::__invoke() -{ - AsyncStatus status = _connection->flushAsyncBatchRequests(this); - if(status & AsyncStatusSent) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) + if(completed(ex)) { - __invokeSent(); + invokeCompletedAsync(); } } } -Ice::ConnectionPtr -IceInternal::ConnectionBatchOutgoingAsync::getConnection() const +CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, cb, cookie) { - return _connection; -} + _observer.attach(instance.get(), operation); -IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) -{ // // _useCount is initialized to 1 to prevent premature callbacks. // The caller must invoke ready() after all flush requests have // been initiated. // _useCount = 1; - - // - // Assume all connections are flushed synchronously. - // - _sentSynchronously = true; - - // - // Attach observer - // - _observer.attach(instance.get(), operation); } void -IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con) +CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) { - class BatchOutgoingAsyncI : public BatchOutgoingAsync + class FlushBatch : public OutgoingAsyncBase { public: - - BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, - InvocationObserver& observer) : - BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), - _outAsync(outAsync), _observer(observer) + + FlushBatch(const CommunicatorFlushBatchPtr& outAsync, + const InstancePtr& instance, + InvocationObserver& observer) : + OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), + _outAsync(outAsync), + _observer(observer) { } - virtual bool __sent() + virtual bool sent() { _childObserver.detach(); _outAsync->check(false); return false; } -#ifdef __SUNPRO_CC - using BatchOutgoingAsync::__sent; -#endif - - virtual void __finished(const Ice::Exception& ex) + virtual bool completed(const Exception& ex) { _childObserver.failed(ex.ice_name()); _childObserver.detach(); _outAsync->check(false); + return false; } - virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, - Ice::Int requestId, Ice::Int sz) + private: + + virtual InvocationObserver& getObserver() { - _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); + return _observer; } - private: - - const CommunicatorBatchOutgoingAsyncPtr _outAsync; + const CommunicatorFlushBatchPtr _outAsync; InvocationObserver& _observer; }; @@ -1166,13 +785,9 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt try { - AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer)); - if(!(status & AsyncStatusSent)) - { - _sentSynchronously = false; - } + con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer)); } - catch(const Ice::LocalException&) + catch(const LocalException&) { check(false); throw; @@ -1180,19 +795,13 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } void -IceInternal::CommunicatorBatchOutgoingAsync::ready() +CommunicatorFlushBatch::ready() { check(true); } void -IceInternal::CommunicatorBatchOutgoingAsync::__processRetry() -{ - assert(false); // Retries are never scheduled -} - -void -IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) +CommunicatorFlushBatch::check(bool userThread) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -1201,255 +810,18 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) { return; } - _state |= Done | OK | Sent; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _monitor.notifyAll(); } - if(!_callback || !_callback->hasSentCallback()) + if(sent(true)) { - _observer.detach(); - } - else - { - // - // _sentSynchronously is immutable here. - // - if(!_sentSynchronously || !userThread) + if(userThread) { - __invokeSentAsync(); + _sentSynchronously = true; + invokeSent(); } else { - AsyncResult::__invokeSent(); - } - } -} - -IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), - _cnt(0) -{ - _observer.attach(prx.get(), operation, 0); -} - -void -IceInternal::GetConnectionOutgoingAsync::__invoke() -{ - while(true) - { - try - { - _handler = _proxy->__getRequestHandler(); - _handler->sendAsyncRequest(this); - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); - } - catch(const Ice::Exception& ex) - { - handleException(ex); + invokeSentAsync(); } - break; - } -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) -{ - __sent(); - return AsyncStatusSent; -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) -{ - __sent(); - return AsyncStatusSent; -} - -bool -IceInternal::GetConnectionOutgoingAsync::__sent() -{ - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _monitor.notifyAll(); - } - __invokeCompleted(); - return false; -} - -void -IceInternal::GetConnectionOutgoingAsync::__invokeSent() -{ - // No sent callback -} - -void -IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) -{ - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::GetConnectionOutgoingAsync::__processRetry() -{ - __invoke(); -} - -void -IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) -{ - try - { - _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); - _observer.retried(); // Invocation is being retried. - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } -} - -namespace -{ - -// -// Dummy class derived from CallbackBase -// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, -// this allows us to test whether the user supplied a null delegate instance to the -// generated begin_ method without having to generate a separate test to throw IllegalArgumentException -// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated -// object code. -// -class DummyCallback : public CallbackBase -{ -public: - - DummyCallback() - { - } - - virtual void - completed(const Ice::AsyncResultPtr&) const - { - assert(false); - } - - virtual CallbackBasePtr - verify(const Ice::LocalObjectPtr&) - { - // - // Called by the AsyncResult constructor to verify the delegate. The dummy - // delegate is passed when the user used a begin_ method without delegate. - // By returning 0 here, we tell the AsyncResult that no delegates was - // provided. - // - return 0; - } - - virtual void - sent(const AsyncResultPtr&) const - { - assert(false); - } - - virtual bool - hasSentCallback() const - { - assert(false); - return false; - } -}; - -} - -// -// This gives a pointer value to compare against in the generated -// begin_ method to decide whether the caller passed a null pointer -// versus the generated inline version of the begin_ method having -// passed a pointer to the dummy delegate. -// -CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; - -#ifdef ICE_CPP11 - -Ice::CallbackPtr -Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent) -{ - class Cpp11CB : public GenericCallbackBase - { - public: - - Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, - const ::std::function<void (const AsyncResultPtr&)>& sent) : - _completed(completed), - _sent(sent) - { - checkCallback(true, completed != nullptr); - } - - virtual void - completed(const AsyncResultPtr& result) const - { - _completed(result); - } - - virtual CallbackBasePtr - verify(const LocalObjectPtr&) - { - return this; // Nothing to do, the cookie is not type-safe. - } - - virtual void - sent(const AsyncResultPtr& result) const - { - if(_sent != nullptr) - { - _sent(result); - } - } - - virtual bool - hasSentCallback() const - { - return _sent != nullptr; - } - - private: - - ::std::function< void (const AsyncResultPtr&)> _completed; - ::std::function< void (const AsyncResultPtr&)> _sent; - }; - - return new Cpp11CB(completed, sent); -} -#endif - -void -IceInternal::CallbackBase::checkCallback(bool obj, bool cb) -{ - if(!obj) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); - } - if(!cb) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); } } |