diff options
41 files changed, 554 insertions, 262 deletions
diff --git a/cpp/include/Ice/AsyncResult.h b/cpp/include/Ice/AsyncResult.h index 5452c5de970..c94d4f01a19 100644 --- a/cpp/include/Ice/AsyncResult.h +++ b/cpp/include/Ice/AsyncResult.h @@ -106,7 +106,7 @@ protected: void invokeCompleted(); void cancel(const LocalException&); - void cancelable(const IceInternal::CancellationHandlerPtr&); + virtual void cancelable(const IceInternal::CancellationHandlerPtr&); void checkCanceled(); void warning(const std::exception&) const; diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index b00c97d68f9..8ea4b54e34c 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -92,6 +92,7 @@ public: using OutgoingAsyncBase::sent; virtual bool completed(const Ice::Exception&); + virtual void cancelable(const CancellationHandlerPtr&); void retry(); void abort(const Ice::Exception&); diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp index 03abc55e020..dc1b5689339 100644 --- a/cpp/src/Ice/AsyncResult.cpp +++ b/cpp/src/Ice/AsyncResult.cpp @@ -420,25 +420,20 @@ AsyncResult::cancel(const Ice::LocalException& ex) void AsyncResult::cancelable(const CancellationHandlerPtr& handler) { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!_cancellationException.get()) - { - _cancellationHandler = handler; - return; - } - } - handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get()); -} - -void -AsyncResult::checkCanceled() -{ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); if(_cancellationException.get()) { - _cancellationException->ice_throw(); + try + { + _cancellationException->ice_throw(); + } + catch(const Ice::Exception&) + { + _cancellationException.reset(0); + throw; + } } + _cancellationHandler = handler; } void diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 3fc321735cb..f97fadf6c02 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -390,6 +390,9 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) if(_reference->getInvocationTimeout() > 0 || _response) { Lock sync(*this); + + outAsync->cancelable(this); // This will throw if the request is canceled + if(_response) { requestId = ++_requestId; @@ -399,7 +402,6 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) { _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } - outAsync->cancelable(this); } outAsync->attachCollocatedObserver(_adapter, requestId); @@ -479,11 +481,11 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { + outAsync->cancelable(this); // This will throw if the request is canceled + if(_reference->getInvocationTimeout() > 0) { _sendAsyncRequests.insert(make_pair(outAsync, 0)); - - outAsync->cancelable(this); } assert(!_batchStream.b.empty()); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 7d7f4a9291c..4922d010a4e 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -229,6 +229,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { { Lock sync(*this); + if(!_initialized) + { + out->cancelable(this); // This will throw if the request is canceled + } + try { if(!initialized()) @@ -236,7 +241,6 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) Request req; req.outAsync = out; _requests.push_back(req); - out->cancelable(this); return AsyncStatusQueued; } } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 01df99084be..eb2a03cfc34 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -711,6 +711,12 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + out->cancelable(this); + Int requestId = 0; if(response) { @@ -750,11 +756,6 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _exception->ice_throw(); } - if(response || status & AsyncStatusQueued) - { - out->cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -1135,6 +1136,12 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync->cancelable(this); + + // // Fill in the number of requests in the batch. // const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); @@ -1163,11 +1170,6 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) _exception->ice_throw(); } - if(status & AsyncStatusQueued) - { - outAsync->cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // @@ -1306,21 +1308,27 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - o->canceled(true); // true = adopt the stream. + setState(StateClosed, ex); } else { - o->canceled(false); - _sendStreams.erase(o); + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->canceled(true); // true = adopt the stream. + } + else + { + o->canceled(false); + _sendStreams.erase(o); + } + out->completed(ex); } - - out->completed(ex); return; } } @@ -1330,9 +1338,17 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& { if(_requestsHint != _requests.end() && _requestsHint->second == o) { - o->completed(ex); - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) + { + setState(StateClosed, ex); + } + else + { + o->completed(ex); + _requests.erase(_requestsHint); + _requestsHint = _requests.end(); + } + return; } else { @@ -1340,9 +1356,16 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& { if(p->second == o) { - o->completed(ex); - assert(p != _requestsHint); - _requests.erase(p); + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) + { + setState(StateClosed, ex); + } + else + { + o->completed(ex); + assert(p != _requestsHint); + _requests.erase(p); + } return; // We're done. } } @@ -1381,23 +1404,29 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - o->canceled(true); // true = adopt the stream + setState(StateClosed, ex); } else { - o->canceled(false); - _sendStreams.erase(o); - } - if(outAsync->completed(ex)) - { - sync.release(); - outAsync->invokeCompleted(); + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->canceled(true); // true = adopt the stream + } + else + { + o->canceled(false); + _sendStreams.erase(o); + } + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } @@ -1410,11 +1439,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { if(_asyncRequestsHint->second == o) { - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); - if(outAsync->completed(ex)) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - outAsync->invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } @@ -1424,11 +1460,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { if(p->second.get() == o.get()) { - assert(p != _asyncRequestsHint); - _asyncRequests.erase(p); - if(outAsync->completed(ex)) + if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { - outAsync->invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + assert(p != _asyncRequestsHint); + _asyncRequests.erase(p); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } } return; } diff --git a/cpp/src/Ice/DefaultsAndOverrides.cpp b/cpp/src/Ice/DefaultsAndOverrides.cpp index 03bed195abd..6ce5a0e9e35 100644 --- a/cpp/src/Ice/DefaultsAndOverrides.cpp +++ b/cpp/src/Ice/DefaultsAndOverrides.cpp @@ -138,7 +138,7 @@ IceInternal::DefaultsAndOverrides::DefaultsAndOverrides(const PropertiesPtr& pro const_cast<int&>(defaultInvocationTimeout) = properties->getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); - if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1) + if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1 && defaultInvocationTimeout != -2) { const_cast<Int&>(defaultInvocationTimeout) = -1; Warning out(logger); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 4815b9796fb..8e13a64dc45 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -208,12 +208,13 @@ Outgoing::invoke() return true; } + const int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); int cnt = 0; while(true) { try { - if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now()) + if(invocationTimeout > 0 && _invocationTimeoutDeadline <= IceUtil::Time::now()) { throw Ice::InvocationTimeoutException(__FILE__, __LINE__); } @@ -228,11 +229,27 @@ Outgoing::invoke() { return true; } + + if(invocationTimeout == -2) // Use the connection timeout + { + try + { + _invocationTimeoutDeadline = IceUtil::Time(); // Reset any previously set value + + int timeout = _handler->waitForConnection()->timeout(); + if(timeout > 0) + { + _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(timeout); + } + } + catch(const Ice::LocalException&) + { + } + } bool timedOut = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - // // If the handler says it's not finished, we wait until we're done. // @@ -262,7 +279,14 @@ Outgoing::invoke() if(timedOut) { - _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); + if(invocationTimeout == -2) + { + _handler->requestCanceled(this, ConnectionTimeoutException(__FILE__, __LINE__)); + } + else + { + _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); + } // // Wait for the exception to propagate. It's possible the request handler ignores @@ -298,7 +322,7 @@ Outgoing::invoke() interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); if(interval > IceUtil::Time()) { - if(_invocationTimeoutDeadline != IceUtil::Time()) + if(invocationTimeout > 0) { IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now(); if(deadline < interval) diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index c2c443f2184..fca1013252e 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -86,6 +86,12 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) _childObserver.detach(); } + _cachedConnection = 0; + if(_proxy->__reference()->getInvocationTimeout() == -2) + { + _instance->timer()->cancel(this); + } + // // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. @@ -101,6 +107,21 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) } } + +void +ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) +{ + if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection) + { + const int timeout = _cachedConnection->timeout(); + if(timeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout)); + } + } + AsyncResult::cancelable(handler); +} + void ProxyOutgoingAsyncBase::retry() { @@ -154,7 +175,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } else { - checkCanceled(); // Cancellation exception aren't retriable _observer.retried(); } @@ -204,7 +224,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } else { - checkCanceled(); // Cancellation exception aren't retriable _observer.retried(); } } @@ -233,7 +252,7 @@ ProxyOutgoingAsyncBase::sent(bool done) _sent = true; if(done) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -244,7 +263,7 @@ ProxyOutgoingAsyncBase::sent(bool done) bool ProxyOutgoingAsyncBase::finished(const Exception& ex) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -254,7 +273,7 @@ ProxyOutgoingAsyncBase::finished(const Exception& ex) bool ProxyOutgoingAsyncBase::finished(bool ok) { - if(_proxy->__reference()->getInvocationTimeout() > 0) + if(_proxy->__reference()->getInvocationTimeout() != -1) { _instance->timer()->cancel(this); } @@ -276,12 +295,13 @@ ProxyOutgoingAsyncBase::handleException(const Exception& exc) void ProxyOutgoingAsyncBase::runTimerTask() { - try + if(_proxy->__reference()->getInvocationTimeout() == -2) { - cancel(InvocationTimeoutException(__FILE__, __LINE__)); + cancel(ConnectionTimeoutException(__FILE__, __LINE__)); } - catch(const CommunicatorDestroyedException&) + else { + cancel(InvocationTimeoutException(__FILE__, __LINE__)); } } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 13bd0120a8f..d18a38a96e5 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -1163,7 +1163,7 @@ IceProxy::Ice::Object::ice_getInvocationTimeout() const ObjectPrx IceProxy::Ice::Object::ice_invocationTimeout(Int newTimeout) const { - if(newTimeout < 1 && newTimeout != -1) + if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2) { ostringstream s; s << "invalid value passed to ice_invocationTimeout: " << newTimeout; diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index fe0e03d980f..063f413c80a 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -44,16 +44,14 @@ IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException } void -IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&) +IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { if(_queue->cancel(this)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - _outAsync->retry(); + if(_outAsync->completed(ex)) + { + _outAsync->invokeCompletedAsync(); + } } } @@ -89,6 +87,7 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } RetryTaskPtr task = new RetryTask(this, out); + out->cancelable(task); // This will throw if the request is canceled. try { _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval)); @@ -98,7 +97,6 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _requests.insert(task); - out->cancelable(task); } void diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index fffc66f7ec8..e08d85846a6 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -2672,12 +2672,15 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Ice::AsyncResultPtr r; Ice::ByteSeq seq; - seq.resize(1024); // Make sure the request doesn't compress too well. + seq.resize(10024); // Make sure the request doesn't compress too well. for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) { *q = static_cast<Ice::Byte>(IceUtilInternal::random(255)); } - while((r = p->begin_opWithPayload(seq))->sentSynchronously()); + for(int i = 0; i < 200; ++i) // 2MB + { + r = p->begin_opWithPayload(seq); + } test(!r->isSent()); diff --git a/cpp/test/Ice/proxy/AllTests.cpp b/cpp/test/Ice/proxy/AllTests.cpp index ce615ab82d8..b9371178371 100644 --- a/cpp/test/Ice/proxy/AllTests.cpp +++ b/cpp/test/Ice/proxy/AllTests.cpp @@ -501,6 +501,7 @@ allTests(const Ice::CommunicatorPtr& communicator) try { base->ice_invocationTimeout(-1); + base->ice_invocationTimeout(-2); } catch(const IceUtil::IllegalArgumentException&) { @@ -509,7 +510,7 @@ allTests(const Ice::CommunicatorPtr& communicator) try { - base->ice_invocationTimeout(-2); + base->ice_invocationTimeout(-3); test(false); } catch(const IceUtil::IllegalArgumentException&) diff --git a/cpp/test/Ice/timeout/AllTests.cpp b/cpp/test/Ice/timeout/AllTests.cpp index c5be82c13d1..f3866356dac 100644 --- a/cpp/test/Ice/timeout/AllTests.cpp +++ b/cpp/test/Ice/timeout/AllTests.cpp @@ -209,6 +209,50 @@ allTests(const Ice::CommunicatorPtr& communicator) to->begin_sleep(250, newCallback_Timeout_sleep(cb, &Callback::response, &Callback::exception)); cb->check(); } + { + // + // Backward compatible connection timeouts + // + TimeoutPrx to = TimeoutPrx::uncheckedCast(obj->ice_invocationTimeout(-2)->ice_timeout(250)); + Ice::ConnectionPtr con; + try + { + con = to->ice_getConnection(); + to->sleep(500); + test(false); + } + catch(const Ice::TimeoutException&) + { + try + { + con->getInfo(); + test(false); + } + catch(const Ice::TimeoutException&) + { + // Connection got closed as well. + } + } + + try + { + con = to->ice_getConnection(); + to->end_sleep(to->begin_sleep(500)); + test(false); + } + catch(const Ice::TimeoutException&) + { + try + { + con->getInfo(); + test(false); + } + catch(const Ice::TimeoutException&) + { + // Connection got closed as well. + } + } + } cout << "ok" << endl; cout << "testing close timeout... " << flush; diff --git a/cs/src/Ice/AsyncResult.cs b/cs/src/Ice/AsyncResult.cs index e8db66e4b0d..e5b5e875684 100644 --- a/cs/src/Ice/AsyncResult.cs +++ b/cs/src/Ice/AsyncResult.cs @@ -404,17 +404,23 @@ namespace IceInternal }, cachedConnection_); } - public void cancelable(CancellationHandler handler) + public virtual void cancelable(CancellationHandler handler) { lock(this) { - if(_cancellationException == null) + if(_cancellationException != null) { - _cancellationHandler = handler; - return; + try + { + throw _cancellationException; + } + finally + { + _cancellationException = null; + } } + _cancellationHandler = handler; } - handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException); } public bool wait() @@ -631,17 +637,6 @@ namespace IceInternal _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex); } - protected void checkCanceled() - { - lock(this) - { - if(_cancellationException != null) - { - throw _cancellationException; - } - } - } - protected virtual Ice.Instrumentation.InvocationObserver getObserver() { return observer_; diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index 632934a7d33..a0e1a354938 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -287,6 +287,8 @@ namespace IceInternal { lock(this) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_response) { requestId = ++_requestId; @@ -296,7 +298,6 @@ namespace IceInternal { _sendAsyncRequests.Add(outAsync, requestId); } - outAsync.cancelable(this); } } @@ -364,10 +365,11 @@ namespace IceInternal invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_reference.getInvocationTimeout() > 0) { _sendAsyncRequests.Add(outAsync, 0); - outAsync.cancelable(this); } Debug.Assert(!_batchStream.isEmpty()); diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs index eed058479e7..5069903f90f 100644 --- a/cs/src/Ice/ConnectRequestHandler.cs +++ b/cs/src/Ice/ConnectRequestHandler.cs @@ -147,12 +147,16 @@ namespace IceInternal { lock(this) { + if(!_initialized) + { + outAsync.cancelable(this); // This will throw if the request is canceled + } + try { if(!initialized()) { _requests.AddLast(new Request(outAsync)); - outAsync.cancelable(this); sentCallback = null; return false; } diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 6b7ce6d7510..db509426f1a 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -408,6 +408,12 @@ namespace Ice // _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + og.cancelable(this); + int requestId = 0; if(response) { @@ -444,11 +450,6 @@ namespace Ice throw _exception; } - if(response || !sent) - { - og.cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -717,6 +718,12 @@ namespace Ice } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync.cancelable(this); + + // // Fill in the number of requests in the batch. // _batchStream.pos(IceInternal.Protocol.headerSize); @@ -743,11 +750,6 @@ namespace Ice throw _exception; } - if(!sent) - { - outAsync.cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // @@ -850,19 +852,26 @@ namespace Ice _asyncRequests.Remove(o.requestId); } - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - o.canceled(); - if(o != _sendStreams.First.Value) + if(ex is Ice.ConnectionTimeoutException) { - _sendStreams.Remove(p); + setState(StateClosed, ex); } - Ice.AsyncCallback cb = outAsync.completed(ex); - if(cb != null) + else { - outAsync.invokeCompletedAsync(cb); + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + o.canceled(); + if(o != _sendStreams.First.Value) + { + _sendStreams.Remove(p); + } + Ice.AsyncCallback cb = outAsync.completed(ex); + if(cb != null) + { + outAsync.invokeCompletedAsync(cb); + } } return; } @@ -875,11 +884,18 @@ namespace Ice { if(kvp.Value == o) { - _asyncRequests.Remove(kvp.Key); - Ice.AsyncCallback cb = outAsync.completed(ex); - if(cb != null) + if(ex is Ice.ConnectionTimeoutException) { - outAsync.invokeCompletedAsync(cb); + setState(StateClosed, ex); + } + else + { + _asyncRequests.Remove(kvp.Key); + Ice.AsyncCallback cb = outAsync.completed(ex); + if(cb != null) + { + outAsync.invokeCompletedAsync(cb); + } } return; } diff --git a/cs/src/Ice/DefaultsAndOverrides.cs b/cs/src/Ice/DefaultsAndOverrides.cs index f4dc56e6675..944410282f9 100644 --- a/cs/src/Ice/DefaultsAndOverrides.cs +++ b/cs/src/Ice/DefaultsAndOverrides.cs @@ -182,7 +182,7 @@ namespace IceInternal } defaultInvocationTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); - if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1) + if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1 && defaultInvocationTimeout != -2) { defaultInvocationTimeout = -1; StringBuilder msg = new StringBuilder("invalid value for Ice.Default.InvocationTimeout `"); diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 9ae3a55cd20..6da49814641 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -138,6 +138,12 @@ namespace IceInternal childObserver_ = null; } + cachedConnection_ = null; + if(proxy_.reference__().getInvocationTimeout() == -2) + { + instance_.timer().cancel(this); + } + // // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. @@ -153,6 +159,19 @@ namespace IceInternal } } + public override void cancelable(CancellationHandler handler) + { + if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null) + { + int timeout = cachedConnection_.timeout(); + if(timeout > 0) + { + instance_.timer().schedule(this, timeout); + } + } + base.cancelable(handler); + } + public void retry() { invokeImpl(false); @@ -179,7 +198,14 @@ namespace IceInternal public void runTimerTask() { - cancel(new Ice.InvocationTimeoutException()); + if(proxy_.reference__().getInvocationTimeout() == -2) + { + cancel(new Ice.ConnectionTimeoutException()); + } + else + { + cancel(new Ice.InvocationTimeoutException()); + } } protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie) : @@ -225,7 +251,6 @@ namespace IceInternal } else // If not called from the user thread, it's called from the retry queue { - checkCanceled(); // Cancellation exception aren't retriable if(observer_ != null) { observer_.retried(); @@ -279,7 +304,6 @@ namespace IceInternal } else if(observer_ != null) { - checkCanceled(); observer_.retried(); } } @@ -308,7 +332,7 @@ namespace IceInternal _sent = true; if(done) { - if(proxy_.reference__().getInvocationTimeout() > 0) + if(proxy_.reference__().getInvocationTimeout() != -1) { instance_.timer().cancel(this); } @@ -318,7 +342,7 @@ namespace IceInternal protected new Ice.AsyncCallback finished(Ice.Exception ex) { - if(proxy_.reference__().getInvocationTimeout() > 0) + if(proxy_.reference__().getInvocationTimeout() != -1) { instance_.timer().cancel(this); } @@ -327,7 +351,7 @@ namespace IceInternal protected new Ice.AsyncCallback finished(bool ok) { - if(proxy_.reference__().getInvocationTimeout() > 0) + if(proxy_.reference__().getInvocationTimeout() != -1) { instance_.timer().cancel(this); } @@ -491,7 +515,7 @@ namespace IceInternal public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB) { // The BasicStream cannot be cached if the proxy is not a twoway or there is an invocation timeout set. - if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() > 0) + if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1) { // Disable caching by marking the streams as cached! state_ |= StateCachedBuffers; diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs index c0650815d2b..c815004d284 100644 --- a/cs/src/Ice/Proxy.cs +++ b/cs/src/Ice/Proxy.cs @@ -1632,7 +1632,7 @@ namespace Ice /// <param name="newTimeout">The new invocation timeout (in seconds).</param> public ObjectPrx ice_invocationTimeout(int newTimeout) { - if(newTimeout < 1 && newTimeout != -1) + if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2) { throw new System.ArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout); } diff --git a/cs/src/Ice/RetryQueue.cs b/cs/src/Ice/RetryQueue.cs index 4379ecdc0fd..c6a4aa82816 100644 --- a/cs/src/Ice/RetryQueue.cs +++ b/cs/src/Ice/RetryQueue.cs @@ -38,12 +38,11 @@ namespace IceInternal Debug.Assert(_outAsync == outAsync); if(_retryQueue.cancel(this)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - _outAsync.retry(); + Ice.AsyncCallback cb = _outAsync.completed(ex); + if(cb != null) + { + _outAsync.invokeCompletedAsync(cb); + } } } @@ -79,9 +78,9 @@ namespace IceInternal throw new Ice.CommunicatorDestroyedException(); } RetryTask task = new RetryTask(this, outAsync); + outAsync.cancelable(task); // This will throw if the request is canceled. _instance.timer().schedule(task, interval); _requests.Add(task, null); - outAsync.cancelable(task); } } @@ -114,12 +113,13 @@ namespace IceInternal { lock(this) { - Debug.Assert(_requests.ContainsKey(task)); - _requests.Remove(task); - if(_instance == null && _requests.Count == 0) + if(_requests.Remove(task)) { - // If we are destroying the queue, destroy is probably waiting on the queue to be empty. - System.Threading.Monitor.Pulse(this); + if(_instance == null && _requests.Count == 0) + { + // If we are destroying the queue, destroy is probably waiting on the queue to be empty. + System.Threading.Monitor.Pulse(this); + } } } } diff --git a/cs/test/Ice/proxy/AllTests.cs b/cs/test/Ice/proxy/AllTests.cs index 557b8398871..faf348a50d0 100644 --- a/cs/test/Ice/proxy/AllTests.cs +++ b/cs/test/Ice/proxy/AllTests.cs @@ -515,6 +515,7 @@ public class AllTests : TestCommon.TestApp try { baseProxy.ice_invocationTimeout(-1); + baseProxy.ice_invocationTimeout(-2); } catch(System.ArgumentException) { @@ -523,7 +524,7 @@ public class AllTests : TestCommon.TestApp try { - baseProxy.ice_invocationTimeout(-2); + baseProxy.ice_invocationTimeout(-3); test(false); } catch(System.ArgumentException) diff --git a/cs/test/Ice/timeout/AllTests.cs b/cs/test/Ice/timeout/AllTests.cs index b8ebb722b44..61618a8b802 100644 --- a/cs/test/Ice/timeout/AllTests.cs +++ b/cs/test/Ice/timeout/AllTests.cs @@ -226,6 +226,50 @@ public class AllTests : TestCommon.TestApp }); cb.check(); } + { + // + // Backward compatible connection timeouts + // + Test.TimeoutPrx to = Test.TimeoutPrxHelper.uncheckedCast(obj.ice_invocationTimeout(-2).ice_timeout(250)); + Ice.Connection con = null; + try + { + con = to.ice_getConnection(); + to.sleep(500); + test(false); + } + catch(Ice.TimeoutException) + { + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException) + { + // Connection got closed as well. + } + } + + try + { + con = to.ice_getConnection(); + to.end_sleep(to.begin_sleep(500)); + test(false); + } + catch(Ice.TimeoutException) + { + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException) + { + // Connection got closed as well. + } + } + } WriteLine("ok"); Write("testing close timeout... "); diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 3ed5e3eed93..c0b74d70bac 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -354,6 +354,12 @@ public final class ConnectionI extends IceInternal.EventHandler // _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + out.cancelable(this); + int requestId = 0; if(response) { @@ -388,11 +394,6 @@ public final class ConnectionI extends IceInternal.EventHandler throw (Ice.LocalException) _exception.fillInStackTrace(); } - if(response || (status & IceInternal.AsyncStatus.Queued) > 0) - { - out.cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -679,6 +680,12 @@ public final class ConnectionI extends IceInternal.EventHandler } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync.cancelable(this); + + // // Fill in the number of requests in the batch. // _batchStream.pos(IceInternal.Protocol.headerSize); @@ -704,11 +711,6 @@ public final class ConnectionI extends IceInternal.EventHandler throw (Ice.LocalException) _exception.fillInStackTrace(); } - if((status & IceInternal.AsyncStatus.Queued) > 0) - { - outAsync.cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // @@ -808,22 +810,29 @@ public final class ConnectionI extends IceInternal.EventHandler _asyncRequests.remove(o.requestId); } - // - // If the request is being sent, don't remove it from the send - // streams, it will be removed once the sending is finished. - // - // Note that since we swapped the message stream to _writeStream - // it's fine if the OutgoingAsync output stream is released (and - // as long as canceled requests cannot be retried). - // - o.canceled(); - if(o != _sendStreams.getFirst()) + if(ex instanceof ConnectionTimeoutException) { - it.remove(); + setState(StateClosed, ex); } - if(outAsync.completed(ex)) + else { - outAsync.invokeCompletedAsync(); + // + // If the request is being sent, don't remove it from the send + // streams, it will be removed once the sending is finished. + // + // Note that since we swapped the message stream to _writeStream + // it's fine if the OutgoingAsync output stream is released (and + // as long as canceled requests cannot be retried). + // + o.canceled(); + if(o != _sendStreams.getFirst()) + { + it.remove(); + } + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } return; } @@ -837,11 +846,19 @@ public final class ConnectionI extends IceInternal.EventHandler { if(it2.next() == o) { - it2.remove(); - if(outAsync.completed(ex)) + if(ex instanceof ConnectionTimeoutException) { - outAsync.invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + it2.remove(); + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } + return; } } } @@ -1770,7 +1787,8 @@ public final class ConnectionI extends IceInternal.EventHandler _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -1957,7 +1975,8 @@ public final class ConnectionI extends IceInternal.EventHandler _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 3f7bb25d9ad..2c763f56cc2 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -1757,7 +1757,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final ObjectPrx ice_invocationTimeout(int newTimeout) { - if(newTimeout < 1 && newTimeout != -1) + if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2) { throw new IllegalArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout); } diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java index 9d7445584fe..6c31dfb8e38 100644 --- a/java/src/IceInternal/AsyncResultI.java +++ b/java/src/IceInternal/AsyncResultI.java @@ -228,17 +228,20 @@ public class AsyncResultI implements AsyncResult }); } - public void cancelable(final CancellationHandler handler) + synchronized public void cancelable(final CancellationHandler handler) { - synchronized(this) + if(_cancellationException != null) { - if(_cancellationException == null) + try { - _cancellationHandler = handler; - return; + throw _cancellationException; + } + finally + { + _cancellationException = null; } } - handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException); + _cancellationHandler = handler; } public final boolean __wait() @@ -408,17 +411,6 @@ public class AsyncResultI implements AsyncResult _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex); } - protected void checkCanceled() - { - synchronized(this) - { - if(_cancellationException != null) - { - throw _cancellationException; - } - } - } - protected Ice.Instrumentation.InvocationObserver getObserver() { return _observer; diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 48d7bfa5b7d..e3f5f045f58 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -311,6 +311,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { synchronized(this) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_response) { requestId = ++_requestId; @@ -320,7 +322,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _sendAsyncRequests.put(outAsync, requestId); } - outAsync.cancelable(this); } } @@ -366,10 +367,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, 0); - outAsync.cancelable(this); } assert(!_batchStream.isEmpty()); diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index e4c1e6b1727..a5f92ac090c 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -154,12 +154,16 @@ public class ConnectRequestHandler { synchronized(this) { + if(!_initialized) + { + out.cancelable(this); // This will throw if the request is canceled + } + try { if(!initialized()) { _requests.add(new Request(out)); - out.cancelable(this); return AsyncStatus.Queued; } } diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java index 5dbdcc1a0dc..916ca3b899d 100644 --- a/java/src/IceInternal/DefaultsAndOverrides.java +++ b/java/src/IceInternal/DefaultsAndOverrides.java @@ -193,7 +193,7 @@ public final class DefaultsAndOverrides } intValue = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); - if(intValue < 1 && intValue != -1) + if(intValue < 1 && intValue != -1 && intValue != -2) { defaultInvocationTimeout = -1; StringBuffer msg = new StringBuffer("invalid value for Ice.Default.InvocationTimeout `"); diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java index 0441552fb45..29779078111 100644 --- a/java/src/IceInternal/ProxyOutgoingAsyncBase.java +++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java @@ -66,6 +66,27 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase invokeImpl(false); } + public void cancelable(final CancellationHandler handler) + { + if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null) + { + final int timeout = _cachedConnection.timeout(); + if(timeout > 0) + { + _future = _instance.timer().schedule( + new Runnable() + { + @Override + public void run() + { + cancel(new Ice.ConnectionTimeoutException()); + } + }, timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + } + } + super.cancelable(handler); + } + public void abort(Ice.Exception ex) { assert(_childObserver == null); @@ -128,15 +149,14 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase { @Override public void run() - { - cancel(new Ice.InvocationTimeoutException()); - } + { + cancel(new Ice.InvocationTimeoutException()); + } }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); } } else // If not called from the user thread, it's called from the retry queue { - checkCanceled(); // Cancellation exception aren't retriable if(_observer != null) { _observer.retried(); @@ -190,7 +210,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } else if(_observer != null) { - checkCanceled(); _observer.retried(); } } diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java index 82831529545..28c10b7bd48 100644 --- a/java/src/IceInternal/RetryQueue.java +++ b/java/src/IceInternal/RetryQueue.java @@ -23,9 +23,9 @@ public class RetryQueue throw new Ice.CommunicatorDestroyedException(); } RetryTask task = new RetryTask(this, outAsync); + outAsync.cancelable(task); // This will throw if the request is canceled task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS)); _requests.add(task); - outAsync.cancelable(task); } synchronized public void destroy() diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java index 974dc998a79..b847dc69f33 100644 --- a/java/src/IceInternal/RetryTask.java +++ b/java/src/IceInternal/RetryTask.java @@ -36,12 +36,10 @@ class RetryTask implements Runnable, CancellationHandler { if(_queue.remove(this) && _future.cancel(false)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - _outAsync.retry(); + if(_outAsync.completed(ex)) + { + _outAsync.invokeCompletedAsync(); + } } } diff --git a/java/test/Ice/proxy/AllTests.java b/java/test/Ice/proxy/AllTests.java index 45dfbe901ed..98e3c124fbb 100644 --- a/java/test/Ice/proxy/AllTests.java +++ b/java/test/Ice/proxy/AllTests.java @@ -519,6 +519,7 @@ public class AllTests try { base.ice_invocationTimeout(-1); + base.ice_invocationTimeout(-2); } catch(IllegalArgumentException e) { @@ -527,7 +528,7 @@ public class AllTests try { - base.ice_invocationTimeout(-2); + base.ice_invocationTimeout(-3); test(false); } catch(IllegalArgumentException e) diff --git a/java/test/Ice/timeout/AllTests.java b/java/test/Ice/timeout/AllTests.java index 46dd8e00452..7f0239f32c5 100644 --- a/java/test/Ice/timeout/AllTests.java +++ b/java/test/Ice/timeout/AllTests.java @@ -272,6 +272,52 @@ public class AllTests to.begin_sleep(250 * mult, cb); cb.check(); } + { + // + // Backward compatible connection timeouts + // + TimeoutPrx to = TimeoutPrxHelper.uncheckedCast(obj.ice_invocationTimeout(-2).ice_timeout(250)); + Ice.Connection con = null; + try + { + con = to.ice_getConnection(); + to.sleep(500); + test(false); + } + catch(Ice.TimeoutException ex) + { + assert(con != null); + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException exc) + { + // Connection got closed as well. + } + } + + try + { + con = to.ice_getConnection(); + to.end_sleep(to.begin_sleep(500)); + test(false); + } + catch(Ice.TimeoutException ex) + { + assert(con != null); + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException exc) + { + // Connection got closed as well. + } + } + } out.println("ok"); out.print("testing close timeout... "); diff --git a/js/src/Ice/AsyncResult.js b/js/src/Ice/AsyncResult.js index 5564803f64c..082b73b689a 100644 --- a/js/src/Ice/AsyncResult.js +++ b/js/src/Ice/AsyncResult.js @@ -118,19 +118,16 @@ var AsyncResult = Ice.Class(AsyncResultBase, { { if(this._cancellationException) { - handler.asyncRequestCanceled(this, this._cancellationException); - } - else - { - this._cancellationHandler = handler; - } - }, - __checkCanceled: function() - { - if(this._cancellationException) - { - throw this._cancellationException; + try + { + throw this._cancellationException; + } + finally + { + this._cancellationException = null; + } } + this._cancellationHandler = handler; }, __os: function() { diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js index 602c71eeffe..3e7e5a559cc 100644 --- a/js/src/Ice/ConnectRequestHandler.js +++ b/js/src/Ice/ConnectRequestHandler.js @@ -155,12 +155,16 @@ var ConnectRequestHandler = Ice.Class({ }, sendAsyncRequest: function(out) { + if(!this._initialized) + { + out.__cancelable(this); // This will throw if the request is canceled + } + try { if(!this.initialized()) { this._requests.push(new Request(out)); - out.__cancelable(this); return AsyncStatus.Queued; } } diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js index 28b2e60352a..d14417c7412 100644 --- a/js/src/Ice/ConnectionI.js +++ b/js/src/Ice/ConnectionI.js @@ -399,6 +399,12 @@ var ConnectionI = Class({ // this._transceiver.checkSendSize(os, this._instance.messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + out.__cancelable(this); // Notify the request that it's cancelable + if(response) { // @@ -437,11 +443,6 @@ var ConnectionI = Class({ } } - if(response || (status & AsyncStatus.Queued) > 0) - { - out.__cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -665,6 +666,12 @@ var ConnectionI = Class({ } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync.__cancelable(this); + + // // Fill in the number of requests in the batch. // this._batchStream.pos = Protocol.headerSize; @@ -690,11 +697,6 @@ var ConnectionI = Class({ } } - if((status & AsyncStatus.Queued) > 0) - { - outAsync.__cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // diff --git a/js/src/Ice/DefaultsAndOverrides.js b/js/src/Ice/DefaultsAndOverrides.js index 8f937dfec14..4009d6e9f8c 100644 --- a/js/src/Ice/DefaultsAndOverrides.js +++ b/js/src/Ice/DefaultsAndOverrides.js @@ -110,7 +110,7 @@ var DefaultsAndOverrides = function(properties, logger) { this.defaultTimeout = 60000; logger.warning("invalid value for Ice.Default.Timeout `" + properties.getProperty("Ice.Default.Timeout") + - "': defaulting to 60000"); + "': defaulting to 60000"); } this.defaultLocatorCacheTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.LocatorCacheTimeout", -1); @@ -118,7 +118,7 @@ var DefaultsAndOverrides = function(properties, logger) { this.defaultLocatorCacheTimeout = -1; logger.warning("invalid value for Ice.Default.LocatorCacheTimeout `" + - properties.getProperty("Ice.Default.LocatorCacheTimeout") + "': defaulting to -1"); + properties.getProperty("Ice.Default.LocatorCacheTimeout") + "': defaulting to -1"); } this.defaultInvocationTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); @@ -126,13 +126,13 @@ var DefaultsAndOverrides = function(properties, logger) { this.defaultInvocationTimeout = -1; logger.warning("invalid value for Ice.Default.InvocationTimeout `" + - properties.getProperty("Ice.Default.InvocationTimeout") + "': defaulting to -1"); + properties.getProperty("Ice.Default.InvocationTimeout") + "': defaulting to -1"); } this.defaultPreferSecure = properties.getPropertyAsIntWithDefault("Ice.Default.PreferSecure", 0) > 0; value = properties.getPropertyWithDefault("Ice.Default.EncodingVersion", - Ice.encodingVersionToString(Protocol.currentEncoding)); + Ice.encodingVersionToString(Protocol.currentEncoding)); this.defaultEncoding = Ice.stringToEncodingVersion(value); Protocol.checkSupportedEncoding(this.defaultEncoding); diff --git a/js/src/Ice/OutgoingAsync.js b/js/src/Ice/OutgoingAsync.js index 175258d69ec..cb83b4578d7 100644 --- a/js/src/Ice/OutgoingAsync.js +++ b/js/src/Ice/OutgoingAsync.js @@ -116,10 +116,6 @@ var ProxyOutgoingAsyncBase = Ice.Class(OutgoingAsyncBase, { invocationTimeout); } } - else - { - this.__checkCanceled(); - } while(true) { @@ -151,10 +147,6 @@ var ProxyOutgoingAsyncBase = Ice.Class(OutgoingAsyncBase, { this._instance.retryQueue().add(this, interval); return; } - else - { - this.__checkCanceled(); // Cancellation exception aren't retriable - } } } } diff --git a/js/src/Ice/RetryQueue.js b/js/src/Ice/RetryQueue.js index 86148cd0757..e346a4e0efb 100644 --- a/js/src/Ice/RetryQueue.js +++ b/js/src/Ice/RetryQueue.js @@ -25,11 +25,11 @@ var RetryQueue = Class({ throw new Ice.CommunicatorDestroyedException(); } var task = new RetryTask(this, outAsync); + outAsync.__cancelable(task); // This will throw if the request is canceled task.token = this._instance.timer().schedule(function() { task.run(); }, interval); - outAsync.__cancelable(task); this._requests.push(task); }, destroy: function() @@ -90,12 +90,7 @@ var RetryTask = Class({ { if(this.queue.cancel(this)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - this.outAsync.__retry(); + this.outAsync.__completedEx(ex); } } }); |