From d81701ca8182942b7936f9fd84a019b695e9c890 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 23 May 2014 11:59:44 +0200 Subject: Added support for invocation timeouts and ACM heartbeats --- cpp/src/Ice/Outgoing.cpp | 251 +++++++++++++++++++++++++---------------------- 1 file changed, 132 insertions(+), 119 deletions(-) (limited to 'cpp/src/Ice/Outgoing.cpp') diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 5601a705641..cb0a9ff53fb 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -93,6 +93,8 @@ IceInternal::LocalExceptionWrapper::retry() const IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode, const Context* context, InvocationObserver& observer) : _handler(handler), + _exceptionWrapper(false), + _exceptionWrapperRetry(false), _observer(observer), _state(StateUnsent), _encoding(getCompatibleEncoding(handler->getReference()->getEncoding())), @@ -183,72 +185,61 @@ IceInternal::Outgoing::invoke() switch(_handler->getReference()->getMode()) { case Reference::ModeTwoway: + case Reference::ModeOneway: + case Reference::ModeDatagram: { _state = StateInProgress; - Ice::ConnectionI* connection = _handler->sendRequest(this); - assert(connection); + if(_handler->sendRequest(this)) // Request sent and no response expected, we're done. + { + return true; + } bool timedOut = false; - { IceUtil::Monitor::Lock sync(_monitor); // - // If the request is being sent in the background we first wait for the - // sent notification. + // If the handler says it's not finished, we wait until we're done. // - while(_state != StateFailed && !_sent) + int invocationTimeout = _handler->getReference()->getInvocationTimeout(); + if(invocationTimeout > 0) { - _monitor.wait(); - } + IceUtil::Time now = IceUtil::Time::now(); + IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout); + while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) + { + _monitor.timedWait(deadline - now); - // - // Wait until the request has completed, or until the request times out. - // - - Int timeout = connection->timeout(); - while(_state == StateInProgress && !timedOut) - { - if(timeout >= 0) - { - _monitor.timedWait(IceUtil::Time::milliSeconds(timeout)); - - if(_state == StateInProgress) + if((_state == StateInProgress || !_sent) && _state != StateFailed) { - timedOut = true; + now = IceUtil::Time::now(); + timedOut = now >= deadline; } } - else + } + else + { + while((_state == StateInProgress || !_sent) && _state != StateFailed) { _monitor.wait(); } } } - + if(timedOut) { - // - // Must be called outside the synchronization of this - // object. - // - connection->exception(TimeoutException(__FILE__, __LINE__)); - - // - // We must wait until the exception set above has - // propagated to this Outgoing object. - // - { - IceUtil::Monitor::Lock sync(_monitor); - while(_state == StateInProgress) - { - _monitor.wait(); - } - } + _handler->requestTimedOut(this); + assert(_exception.get()); } if(_exception.get()) { + if(_exceptionWrapper) + { + throw LocalExceptionWrapper(*_exception.get(), _exceptionWrapperRetry); + } + // // A CloseConnectionException indicates graceful // server shutdown, and is therefore always repeatable @@ -277,39 +268,9 @@ IceInternal::Outgoing::invoke() // throw LocalExceptionWrapper(*_exception.get(), false); } - - if(_state == StateUserException) - { - return false; - } - else - { - assert(_state == StateOK); - return true; - } - } - - case Reference::ModeOneway: - case Reference::ModeDatagram: - { - _state = StateInProgress; - if(_handler->sendRequest(this)) - { - // - // If the handler returns the connection, we must wait for the sent callback. - // - IceUtil::Monitor::Lock sync(_monitor); - while(_state != StateFailed && !_sent) - { - _monitor.wait(); - } - if(_exception.get()) - { - _exception->ice_throw(); - } - } - return true; + assert(_state != StateInProgress); + return _state == StateOK; } case Reference::ModeBatchOneway: @@ -349,28 +310,23 @@ IceInternal::Outgoing::abort(const LocalException& ex) ex.ice_throw(); } +bool +IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendRequest(this, compress, response); +} + void -IceInternal::Outgoing::sent(bool notify) +IceInternal::Outgoing::sent() { + IceUtil::Monitor::Lock sync(_monitor); if(_handler->getReference()->getMode() != Reference::ModeTwoway) { _remoteObserver.detach(); + _state = StateOK; } - - if(notify) - { - IceUtil::Monitor::Lock sync(_monitor); - _sent = true; - _monitor.notify(); - } - else - { - // - // No synchronization is necessary if called from sendRequest() because the connection - // send mutex is locked and no other threads can call on Outgoing until it's released. - // - _sent = true; - } + _sent = true; + _monitor.notify(); // // NOTE: At this point the stack allocated Outgoing object can be destroyed @@ -379,6 +335,35 @@ IceInternal::Outgoing::sent(bool notify) // } +void +IceInternal::Outgoing::finished(const LocalException& ex, bool sent) +{ + IceUtil::Monitor::Lock sync(_monitor); + assert(_state <= StateInProgress); + _remoteObserver.failed(ex.ice_name()); + _remoteObserver.detach(); + + _state = StateFailed; + _exception.reset(ex.ice_clone()); + _sent = sent; + _monitor.notify(); +} + +void +IceInternal::Outgoing::finished(const LocalExceptionWrapper& ex) +{ + IceUtil::Monitor::Lock sync(_monitor); + _remoteObserver.failed(ex.get()->ice_name()); + _remoteObserver.detach(); + + _state = StateFailed; + _exceptionWrapper = true; + _exceptionWrapperRetry = ex.retry(); + _exception.reset(ex.get()->ice_clone()); + _sent = false; + _monitor.notify(); +} + void IceInternal::Outgoing::finished(BasicStream& is) { @@ -540,20 +525,6 @@ IceInternal::Outgoing::finished(BasicStream& is) _monitor.notify(); } -void -IceInternal::Outgoing::finished(const LocalException& ex, bool sent) -{ - IceUtil::Monitor::Lock sync(_monitor); - assert(_state <= StateInProgress); - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); - - _state = StateFailed; - _exception.reset(ex.ice_clone()); - _sent = sent; - _monitor.notify(); -} - void IceInternal::Outgoing::throwUserException() { @@ -592,35 +563,77 @@ void IceInternal::BatchOutgoing::invoke() { assert(_handler || _connection); - if((_handler && !_handler->flushBatchRequests(this)) || (_connection && !_connection->flushBatchRequests(this))) + + int timeout; + if(_connection) + { + if(_connection->flushBatchRequests(this)) + { + return; + } + timeout = -1; + } + else + { + if(_handler->sendRequest(this)) + { + return; + } + timeout = _handler->getReference()->getInvocationTimeout(); + } + + bool timedOut = false; { IceUtil::Monitor::Lock sync(_monitor); - while(!_exception.get() && !_sent) + if(timeout > 0) { - _monitor.wait(); + IceUtil::Time now = IceUtil::Time::now(); + IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout); + while(!_exception.get() && !_sent && !timedOut) + { + _monitor.timedWait(deadline - now); + if(!_exception.get() && !_sent) + { + now = IceUtil::Time::now(); + timedOut = now >= deadline; + } + } } - if(_exception.get()) + else { - _exception->ice_throw(); + while(!_exception.get() && !_sent) + { + _monitor.wait(); + } } } -} -void -IceInternal::BatchOutgoing::sent(bool notify) -{ - _remoteObserver.detach(); - - if(notify) + if(timedOut) { - IceUtil::Monitor::Lock sync(_monitor); - _sent = true; - _monitor.notify(); + _handler->requestTimedOut(this); + assert(_exception.get()); } - else + + if(_exception.get()) { - _sent = true; + _exception->ice_throw(); } +} + +bool +IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool) +{ + return connection->flushBatchRequests(this); +} + +void +IceInternal::BatchOutgoing::sent() +{ + IceUtil::Monitor::Lock sync(_monitor); + _remoteObserver.detach(); + + _sent = true; + _monitor.notify(); // // NOTE: At this point the stack allocated BatchOutgoing object -- cgit v1.2.3