diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/OutgoingAsync.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 215 |
1 files changed, 69 insertions, 146 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index fb470bceaf2..5a600e8fce2 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -11,7 +11,7 @@ #include <Ice/OutgoingAsync.h> #include <Ice/Object.h> #include <Ice/ConnectionI.h> -#include <Ice/RequestHandler.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/LocalException.h> @@ -20,7 +20,6 @@ #include <Ice/LocatorInfo.h> #include <Ice/ProxyFactory.h> #include <Ice/RouterInfo.h> -#include <Ice/Outgoing.h> // For LocalExceptionWrapper. #include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> @@ -52,10 +51,8 @@ class AsynchronousException : public DispatchWorkItem { public: - AsynchronousException(const IceInternal::InstancePtr& instance, - const Ice::AsyncResultPtr& result, - const Ice::Exception& ex) : - DispatchWorkItem(instance), _result(result), _exception(ex.ice_clone()) + AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) : + _result(result), _exception(ex.ice_clone()) { } @@ -75,8 +72,7 @@ class AsynchronousSent : public DispatchWorkItem { public: - AsynchronousSent(const IceInternal::InstancePtr& instance, const Ice::AsyncResultPtr& result) : - DispatchWorkItem(instance), _result(result) + AsynchronousSent(const Ice::AsyncResultPtr& result) : _result(result) { } @@ -95,10 +91,8 @@ class AsynchronousTimeout : public DispatchWorkItem { public: - AsynchronousTimeout(const IceInternal::InstancePtr& instance, - const IceInternal::RequestHandlerPtr& handler, - const Ice::AsyncResultPtr& result) : - DispatchWorkItem(instance), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) + AsynchronousTimeout(const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) : + _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) { assert(_outAsync); } @@ -144,18 +138,6 @@ Ice::AsyncResult::~AsyncResult() { } -bool -Ice::AsyncResult::operator==(const AsyncResult& r) const -{ - return this == &r; -} - -bool -Ice::AsyncResult::operator<(const AsyncResult& r) const -{ - return this < &r; -} - Int Ice::AsyncResult::getHash() const { @@ -286,7 +268,7 @@ Ice::AsyncResult::__invokeSentAsync() // try { - _instance->clientThreadPool()->execute(new AsynchronousSent(_instance, this)); + _instance->clientThreadPool()->execute(new AsynchronousSent(this)); } catch(const Ice::CommunicatorDestroyedException&) { @@ -318,7 +300,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) // CommunicatorDestroyedException is the only exception that can propagate directly // from this method. // - _instance->clientThreadPool()->execute(new AsynchronousException(_instance, this, ex)); + _instance->clientThreadPool()->execute(new AsynchronousException(this, ex)); } void @@ -360,7 +342,7 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() if(handler) { - _instance->clientThreadPool()->execute(new AsynchronousTimeout(_instance, handler, this)); + _instance->clientThreadPool()->execute(new AsynchronousTimeout(handler, this)); } } @@ -457,7 +439,7 @@ IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, void IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context) { - _delegate = 0; + _handler = 0; _cnt = 0; _mode = mode; _sentSynchronously = false; @@ -528,6 +510,12 @@ IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool c return connection->sendAsyncRequest(this, compress, response); } +AsyncStatus +IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncRequest(this); +} + bool IceInternal::OutgoingAsync::__sent() { @@ -537,7 +525,7 @@ IceInternal::OutgoingAsync::__sent() _state |= Sent; assert(!(_state & Done)); - if(!_proxy->ice_isTwoway()) + if(_proxy->__reference()->getMode() != Reference::ModeTwoway) { _remoteObserver.detach(); if(!_callback || !_callback->__hasSentCallback()) @@ -550,7 +538,7 @@ IceInternal::OutgoingAsync::__sent() _timeoutRequestHandler = 0; } _state |= Done | OK; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. } _monitor.notifyAll(); return !alreadySent && _callback && _callback->__hasSentCallback(); @@ -563,7 +551,7 @@ IceInternal::OutgoingAsync::__invokeSent() } void -IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent) +IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -581,51 +569,16 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. // - try { - int interval = handleException(exc, sent); // This will throw if the invocation can't be retried. - if(interval > 0) - { - _instance->retryQueue()->add(this, interval); - } - else + if(!handleException(exc, sent)) // This will throw if the invocation can't be retried. { - __invoke(false); + return; // Can't be retried immediately. } - } - catch(const Ice::LocalException& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) -{ - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. The LocalExceptionWrapper exception is only called - // before the invocation is sent. - // - _remoteObserver.failed(exc.get()->ice_name()); - _remoteObserver.detach(); - assert(!_timeoutRequestHandler); - - try - { - int interval = handleException(exc); // This will throw if the invocation can't be retried. - if(interval > 0) - { - _instance->retryQueue()->add(this, interval); - } - else - { - __invoke(false); - } + __invoke(false); // Retry the invocation } - catch(const Ice::LocalException& ex) + catch(const Ice::Exception& ex) { __invokeException(ex); } @@ -796,12 +749,10 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) { while(true) { - int interval = 0; try { - _delegate = _proxy->__getDelegate(true); - RequestHandlerPtr handler = _delegate->__getRequestHandler(); - AsyncStatus status = handler->sendAsyncRequest(this); + _handler = _proxy->__getRequestHandler(true); + AsyncStatus status = _handler->sendAsyncRequest(this); if(status & AsyncStatusSent) { if(synchronous) @@ -826,92 +777,53 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); if(!(_state & Done)) { - int invocationTimeout = handler->getReference()->getInvocationTimeout(); + int invocationTimeout = _handler->getReference()->getInvocationTimeout(); if(invocationTimeout > 0) { _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = handler; + _timeoutRequestHandler = _handler; } } } break; } - catch(const LocalExceptionWrapper& ex) + catch(const RetryException&) { - interval = handleException(ex); + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. } - catch(const Ice::LocalException& ex) + catch(const Ice::Exception& ex) { - interval = handleException(ex, false); - } - - if(interval > 0) - { - _instance->retryQueue()->add(this, interval); - return false; + if(!handleException(ex, false)) // This will throw if the invocation can't be retried. + { + break; // Can't be retried immediately. + } } } return _sentSynchronously; } -int -IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex) -{ - if(_mode == Nonmutating || _mode == Idempotent) - { - return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer); - } - else - { - return _proxy->__handleExceptionWrapper(_delegate, ex, _observer); - } -} - -int -IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent) +bool +IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc, bool sent) { try { - // - // A CloseConnectionException indicates graceful server shutdown, and is therefore - // always repeatable without violating "at-most-once". That's because by sending a - // close connection message, the server guarantees that all outstanding requests - // can safely be repeated. - // - // An ObjectNotExistException can always be retried as well without violating - // "at-most-once" (see the implementation of the checkRetryAfterException method of - // the ProxyFactory class for the reasons why it can be useful). - // - if(!sent || - dynamic_cast<const CloseConnectionException*>(&exc) || - dynamic_cast<const ObjectNotExistException*>(&exc)) - { - exc.ice_throw(); - } - - // - // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the - // request cannot be resent without potentially violating the "at-most-once" - // principle. - // - throw LocalExceptionWrapper(exc, false); - } - catch(const LocalExceptionWrapper& ex) - { - if(_mode == Nonmutating || _mode == Idempotent) + int interval = _proxy->__handleException(exc, _handler, _mode, sent, _cnt); + _observer.retried(); // Invocation is being retried. + if(interval > 0) { - return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer); + _instance->retryQueue()->add(this, interval); + return false; // Don't retry immediately, the retry queue will take care of the retry. } else { - return _proxy->__handleExceptionWrapper(_delegate, ex, _observer); + return true; // Retry immediately. } } - catch(const Ice::LocalException& ex) + catch(const Ice::Exception& ex) { - return _proxy->__handleException(_delegate, ex, false, _cnt, _observer); + _observer.failed(ex.ice_name()); + throw; } - return 0; // Keep the compiler happy. } IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator, @@ -929,13 +841,19 @@ IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, b return connection->flushAsyncBatchRequests(this); } +AsyncStatus +IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncBatchRequests(this); +} + bool IceInternal::BatchOutgoingAsync::__sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get()); _state |= Done | OK | Sent; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. _remoteObserver.detach(); if(_timeoutRequestHandler) { @@ -958,7 +876,7 @@ IceInternal::BatchOutgoingAsync::__invokeSent() } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool) +IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool) { _remoteObserver.failed(exc.ice_name()); _remoteObserver.detach(); @@ -985,16 +903,10 @@ IceInternal::ProxyBatchOutgoingAsync::__invoke() { checkSupportedProtocol(_proxy->__reference()->getProtocol()); - // - // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch - // requests were queued with the connection, they would be lost without being noticed. - // - Handle<IceDelegate::Ice::Object> delegate; - int cnt = -1; // Don't retry. + RequestHandlerPtr handler; try { - delegate = _proxy->__getDelegate(true); - RequestHandlerPtr handler = delegate->__getRequestHandler(); + handler = _proxy->__getRequestHandler(true); AsyncStatus status = handler->sendAsyncRequest(this); if(status & AsyncStatusSent) { @@ -1018,9 +930,20 @@ IceInternal::ProxyBatchOutgoingAsync::__invoke() } } } - catch(const ::Ice::LocalException& ex) + catch(const RetryException&) + { + // + // Clear request handler but don't retry or throw. Retrying + // isn't useful, there were no batch requests associated with + // the proxy's request handler. + // + _proxy->__setRequestHandler(handler, 0); + } + catch(const Ice::Exception& ex) { - _proxy->__handleException(delegate, ex, 0, cnt, _observer); + _observer.failed(ex.ice_name()); + _proxy->__setRequestHandler(handler, 0); // Clear request handler + throw; // Throw to notify the user that batch requests were potentially lost. } } @@ -1107,7 +1030,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt using BatchOutgoingAsync::__sent; #endif - virtual void __finished(const Ice::LocalException& ex, bool) + virtual void __finished(const Ice::Exception& ex, bool) { _remoteObserver.failed(ex.ice_name()); _remoteObserver.detach(); |