diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 17:26:45 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 17:26:45 +0200 |
commit | f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2 (patch) | |
tree | 6b12ef2c59421702743048393f4757c0d1e0c504 /cpp/src | |
parent | ICE-5732 missing tracing in throughput demo (diff) | |
download | ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.bz2 ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.xz ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.zip |
Fixed ICE-5666: setting the invocation timeout to -2 provides the previous connection timeouts
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/AsyncResult.cpp | 25 | ||||
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 137 | ||||
-rw-r--r-- | cpp/src/Ice/DefaultsAndOverrides.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 32 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 36 | ||||
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueue.cpp | 14 |
9 files changed, 174 insertions, 88 deletions
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp index 03abc55e020..dc1b5689339 100644 --- a/cpp/src/Ice/AsyncResult.cpp +++ b/cpp/src/Ice/AsyncResult.cpp @@ -420,25 +420,20 @@ AsyncResult::cancel(const Ice::LocalException& ex) void AsyncResult::cancelable(const CancellationHandlerPtr& handler) { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!_cancellationException.get()) - { - _cancellationHandler = handler; - return; - } - } - handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get()); -} - -void -AsyncResult::checkCanceled() -{ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); if(_cancellationException.get()) { - _cancellationException->ice_throw(); + try + { + _cancellationException->ice_throw(); + } + catch(const Ice::Exception&) + { + _cancellationException.reset(0); + throw; + } } + _cancellationHandler = handler; } void diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 3fc321735cb..f97fadf6c02 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -390,6 +390,9 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) if(_reference->getInvocationTimeout() > 0 || _response) { Lock sync(*this); + + outAsync->cancelable(this); // This will throw if the request is canceled + if(_response) { requestId = ++_requestId; @@ -399,7 +402,6 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) { _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } - outAsync->cancelable(this); } outAsync->attachCollocatedObserver(_adapter, requestId); @@ -479,11 +481,11 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { + outAsync->cancelable(this); // This will throw if the request is canceled + if(_reference->getInvocationTimeout() > 0) { _sendAsyncRequests.insert(make_pair(outAsync, 0)); - - outAsync->cancelable(this); } assert(!_batchStream.b.empty()); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 7d7f4a9291c..4922d010a4e 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -229,6 +229,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { { Lock sync(*this); + if(!_initialized) + { + out->cancelable(this); // This will throw if the request is canceled + } + try { if(!initialized()) @@ -236,7 +241,6 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) Request req; req.outAsync = out; _requests.push_back(req); - out->cancelable(this); return AsyncStatusQueued; } } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 01df99084be..eb2a03cfc34 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -711,6 +711,12 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + out->cancelable(this); + Int requestId = 0; if(response) { @@ -750,11 +756,6 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _exception->ice_throw(); } - if(response || status & AsyncStatusQueued) - { - out->cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -1135,6 +1136,12 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync->cancelable(this); + + // // Fill in the number of requests in the batch. // const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); @@ -1163,11 +1170,6 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) _exception->ice_throw(); } - if(status & AsyncStatusQueued) - { - outAsync->cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // @@ -1306,21 +1308,27 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - o->canceled(true); // true = adopt the stream. + setState(StateClosed, ex); } else { - o->canceled(false); - _sendStreams.erase(o); + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->canceled(true); // true = adopt the stream. + } + else + { + o->canceled(false); + _sendStreams.erase(o); + } + out->completed(ex); } - - out->completed(ex); return; } } @@ -1330,9 +1338,17 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& { if(_requestsHint != _requests.end() && _requestsHint->second == o) { - o->completed(ex); - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) + { + setState(StateClosed, ex); + } + else + { + o->completed(ex); + _requests.erase(_requestsHint); + _requestsHint = _requests.end(); + } + return; } else { @@ -1340,9 +1356,16 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& { if(p->second == o) { - o->completed(ex); - assert(p != _requestsHint); - _requests.erase(p); + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) + { + setState(StateClosed, ex); + } + else + { + o->completed(ex); + assert(p != _requestsHint); + _requests.erase(p); + } return; // We're done. } } @@ -1381,23 +1404,29 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - o->canceled(true); // true = adopt the stream + setState(StateClosed, ex); } else { - o->canceled(false); - _sendStreams.erase(o); - } - if(outAsync->completed(ex)) - { - sync.release(); - outAsync->invokeCompleted(); + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->canceled(true); // true = adopt the stream + } + else + { + o->canceled(false); + _sendStreams.erase(o); + } + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } @@ -1410,11 +1439,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { if(_asyncRequestsHint->second == o) { - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); - if(outAsync->completed(ex)) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - outAsync->invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } @@ -1424,11 +1460,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { if(p->second.get() == o.get()) { - assert(p != _asyncRequestsHint); - _asyncRequests.erase(p); - if(outAsync->completed(ex)) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - outAsync->invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + assert(p != _asyncRequestsHint); + _asyncRequests.erase(p); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } diff --git a/cpp/src/Ice/DefaultsAndOverrides.cpp b/cpp/src/Ice/DefaultsAndOverrides.cpp index 03bed195abd..6ce5a0e9e35 100644 --- a/cpp/src/Ice/DefaultsAndOverrides.cpp +++ b/cpp/src/Ice/DefaultsAndOverrides.cpp @@ -138,7 +138,7 @@ IceInternal::DefaultsAndOverrides::DefaultsAndOverrides(const PropertiesPtr& pro const_cast<int&>(defaultInvocationTimeout) = properties->getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); - if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1) + if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1 && defaultInvocationTimeout != -2) { const_cast<Int&>(defaultInvocationTimeout) = -1; Warning out(logger); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 4815b9796fb..8e13a64dc45 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -208,12 +208,13 @@ Outgoing::invoke() return true; } + const int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); int cnt = 0; while(true) { try { - if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now()) + if(invocationTimeout > 0 && _invocationTimeoutDeadline <= IceUtil::Time::now()) { throw Ice::InvocationTimeoutException(__FILE__, __LINE__); } @@ -228,11 +229,27 @@ Outgoing::invoke() { return true; } + + if(invocationTimeout == -2) // Use the connection timeout + { + try + { + _invocationTimeoutDeadline = IceUtil::Time(); // Reset any previously set value + + int timeout = _handler->waitForConnection()->timeout(); + if(timeout > 0) + { + _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(timeout); + } + } + catch(const Ice::LocalException&) + { + } + } bool timedOut = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - // // If the handler says it's not finished, we wait until we're done. // @@ -262,7 +279,14 @@ Outgoing::invoke() if(timedOut) { - _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); + if(invocationTimeout == -2) + { + _handler->requestCanceled(this, ConnectionTimeoutException(__FILE__, __LINE__)); + } + else + { + _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); + } // // Wait for the exception to propagate. It's possible the request handler ignores @@ -298,7 +322,7 @@ Outgoing::invoke() interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); if(interval > IceUtil::Time()) { - if(_invocationTimeoutDeadline != IceUtil::Time()) + if(invocationTimeout > 0) { IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now(); if(deadline < interval) diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index c2c443f2184..fca1013252e 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -86,6 +86,12 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) _childObserver.detach(); } + _cachedConnection = 0; + if(_proxy->__reference()->getInvocationTimeout() == -2) + { + _instance->timer()->cancel(this); + } + // // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. @@ -101,6 +107,21 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) } } + +void +ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) + { + const int timeout = _cachedConnection->timeout(); + if(timeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout)); + } + } + AsyncResult::cancelable(handler); +} + void ProxyOutgoingAsyncBase::retry() { @@ -154,7 +175,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } else { - checkCanceled(); // Cancellation exception aren't retriable _observer.retried(); } @@ -204,7 +224,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } else { - checkCanceled(); // Cancellation exception aren't retriable _observer.retried(); } } @@ -233,7 +252,7 @@ ProxyOutgoingAsyncBase::sent(bool done) _sent = true; if(done) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -244,7 +263,7 @@ ProxyOutgoingAsyncBase::sent(bool done) bool ProxyOutgoingAsyncBase::finished(const Exception& ex) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -254,7 +273,7 @@ ProxyOutgoingAsyncBase::finished(const Exception& ex) bool ProxyOutgoingAsyncBase::finished(bool ok) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -276,12 +295,13 @@ ProxyOutgoingAsyncBase::handleException(const Exception& exc) void ProxyOutgoingAsyncBase::runTimerTask() { - try + if(_proxy->__reference()->getInvocationTimeout() == -2) { - cancel(InvocationTimeoutException(__FILE__, __LINE__)); + cancel(ConnectionTimeoutException(__FILE__, __LINE__)); } - catch(const CommunicatorDestroyedException&) + else { + cancel(InvocationTimeoutException(__FILE__, __LINE__)); } } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 13bd0120a8f..d18a38a96e5 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -1163,7 +1163,7 @@ IceProxy::Ice::Object::ice_getInvocationTimeout() const ObjectPrx IceProxy::Ice::Object::ice_invocationTimeout(Int newTimeout) const { - if(newTimeout < 1 && newTimeout != -1) + if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2) { ostringstream s; s << "invalid value passed to ice_invocationTimeout: " << newTimeout; diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index fe0e03d980f..063f413c80a 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -44,16 +44,14 @@ IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException } void -IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&) +IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { if(_queue->cancel(this)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - _outAsync->retry(); + if(_outAsync->completed(ex)) + { + _outAsync->invokeCompletedAsync(); + } } } @@ -89,6 +87,7 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } RetryTaskPtr task = new RetryTask(this, out); + out->cancelable(task); // This will throw if the request is canceled. try { _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval)); @@ -98,7 +97,6 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _requests.insert(task); - out->cancelable(task); } void |