diff options
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 232 |
1 files changed, 126 insertions, 106 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 66509a2bedc..4815b9796fb 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -7,36 +7,44 @@ // // ********************************************************************** +#include <IceUtil/DisableWarnings.h> #include <Ice/Outgoing.h> -#include <Ice/Object.h> -#include <Ice/CollocatedRequestHandler.h> #include <Ice/ConnectionI.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> -#include <Ice/Endpoint.h> -#include <Ice/LocalException.h> -#include <Ice/Protocol.h> #include <Ice/Instance.h> +#include <Ice/LocalException.h> #include <Ice/ReplyStatus.h> -#include <Ice/ProxyFactory.h> +#include <Ice/ImplicitContextI.h> using namespace std; using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; -IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, - const Context* context) : +OutgoingBase::OutgoingBase(Instance* instance, const string& operation) : + _os(instance, Ice::currentProtocolEncoding), _sent(false) +{ +} + +Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _mode(mode), - _observer(proxy, operation, context), _state(StateUnsent), _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _sent(false) + _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding) { checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); + _observer.attach(proxy, operation, context); + + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(invocationTimeout); + } + switch(_proxy->__reference()->getMode()) { case Reference::ModeTwoway: @@ -129,7 +137,66 @@ Outgoing::~Outgoing() } bool -IceInternal::Outgoing::invoke() +Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendRequest(this, compress, response); +} + +void +Outgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeRequest(this); +} + +void +Outgoing::sent() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_proxy->__reference()->getMode() != Reference::ModeTwoway) + { + _childObserver.detach(); + _state = StateOK; + } + _sent = true; + _monitor.notify(); + + // + // NOTE: At this point the stack allocated Outgoing object can be destroyed + // since the notify() on the monitor will release the thread waiting on the + // synchronous Ice call. + // +} + +void +Outgoing::completed(const Exception& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + //assert(_state <= StateInProgress); + if(_state > StateInProgress) + { + // + // Response was already received but message + // didn't get removed first from the connection + // send message queue so it's possible we can be + // notified of failures. In this case, ignore the + // failure and assume the outgoing has been sent. + // + assert(_state != StateFailed); + _sent = true; + _monitor.notify(); + return; + } + + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + + _state = StateFailed; + _exception.reset(ex.ice_clone()); + _monitor.notify(); +} + +bool +Outgoing::invoke() { assert(_state == StateUnsent); @@ -146,6 +213,11 @@ IceInternal::Outgoing::invoke() { try { + if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now()) + { + throw Ice::InvocationTimeoutException(__FILE__, __LINE__); + } + _state = StateInProgress; _exception.reset(0); _sent = false; @@ -164,19 +236,18 @@ IceInternal::Outgoing::invoke() // // If the handler says it's not finished, we wait until we're done. // - int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - if(invocationTimeout > 0) + if(_invocationTimeoutDeadline != IceUtil::Time()) { IceUtil::Time now = IceUtil::Time::now(); - IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout); + timedOut = now >= _invocationTimeoutDeadline; while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) { - _monitor.timedWait(deadline - now); + _monitor.timedWait(_invocationTimeoutDeadline - now); if((_state == StateInProgress || !_sent) && _state != StateFailed) { now = IceUtil::Time::now(); - timedOut = now >= deadline; + timedOut = now >= _invocationTimeoutDeadline; } } } @@ -191,15 +262,15 @@ IceInternal::Outgoing::invoke() if(timedOut) { - _handler->requestTimedOut(this); + _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); // // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. + // the timeout if there was a failure shortly before requestCanceled got called. // In this case, the exception should be set on the Outgoing. // IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!_exception.get()) + while(_state == StateInProgress) { _monitor.wait(); } @@ -223,11 +294,23 @@ IceInternal::Outgoing::invoke() { try { - int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt); - _observer.retried(); // Invocation is being retried. - if(interval > 0) + IceUtil::Time interval; + interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); + if(interval > IceUtil::Time()) + { + if(_invocationTimeoutDeadline != IceUtil::Time()) + { + IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now(); + if(deadline < interval) + { + interval = deadline; + } + } + IceUtil::ThreadControl::sleep(interval); + } + if(_invocationTimeoutDeadline == IceUtil::Time() || _invocationTimeoutDeadline > IceUtil::Time::now()) { - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); + _observer.retried(); } } catch(const Ice::Exception& ex) @@ -243,7 +326,7 @@ IceInternal::Outgoing::invoke() } void -IceInternal::Outgoing::abort(const LocalException& ex) +Outgoing::abort(const LocalException& ex) { assert(_state == StateUnsent); @@ -261,67 +344,8 @@ IceInternal::Outgoing::abort(const LocalException& ex) ex.ice_throw(); } -bool -IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response); -} - -void -IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this); -} - -void -IceInternal::Outgoing::sent() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_proxy->__reference()->getMode() != Reference::ModeTwoway) - { - _childObserver.detach(); - _state = StateOK; - } - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated Outgoing object can be destroyed - // since the notify() on the monitor will release the thread waiting on the - // synchronous Ice call. - // -} - void -IceInternal::Outgoing::finished(const Exception& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - _monitor.notify(); - return; - } - - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - - _state = StateFailed; - _exception.reset(ex.ice_clone()); - _monitor.notify(); -} - -void -IceInternal::Outgoing::finished(BasicStream& is) +Outgoing::completed(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -482,7 +506,7 @@ IceInternal::Outgoing::finished(BasicStream& is) } void -IceInternal::Outgoing::throwUserException() +Outgoing::throwUserException() { try { @@ -496,27 +520,22 @@ IceInternal::Outgoing::throwUserException() } } -IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) : - _proxy(proxy), - _connection(0), - _sent(false), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _observer(proxy, name, 0) +FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0) { checkSupportedProtocol(proxy->__reference()->getProtocol()); + + _observer.attach(proxy->__reference()->getInstance().get(), operation); } -IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) : - _proxy(0), - _connection(connection), - _sent(false), - _os(instance, Ice::currentProtocolEncoding), - _observer(instance, name) +FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : + OutgoingBase(instance, operation), _proxy(0), _connection(connection) { + _observer.attach(instance, operation); } void -IceInternal::BatchOutgoing::invoke() +FlushBatch::invoke() { assert(_proxy || _connection); @@ -577,7 +596,8 @@ IceInternal::BatchOutgoing::invoke() if(timedOut) { - handler->requestTimedOut(this); + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + handler->requestCanceled(this, ex); // // Wait for the exception to propagate. It's possible the request handler ignores @@ -614,19 +634,19 @@ IceInternal::BatchOutgoing::invoke() } bool -IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool) +FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool) { return connection->flushBatchRequests(this); } void -IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler) +FlushBatch::invokeCollocated(CollocatedRequestHandler* handler) { handler->invokeBatchRequests(this); } void -IceInternal::BatchOutgoing::sent() +FlushBatch::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.detach(); @@ -635,14 +655,14 @@ IceInternal::BatchOutgoing::sent() _monitor.notify(); // - // NOTE: At this point the stack allocated BatchOutgoing object + // NOTE: At this point the stack allocated FlushBatch object // can be destroyed since the notify() on the monitor will release // the thread waiting on the synchronous Ice call. // } void -IceInternal::BatchOutgoing::finished(const Ice::Exception& ex) +FlushBatch::completed(const Ice::Exception& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.failed(ex.ice_name()); |