summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/AsyncResult.h2
-rw-r--r--cpp/include/Ice/OutgoingAsync.h1
-rw-r--r--cpp/src/Ice/AsyncResult.cpp25
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp8
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp6
-rw-r--r--cpp/src/Ice/ConnectionI.cpp137
-rw-r--r--cpp/src/Ice/DefaultsAndOverrides.cpp2
-rw-r--r--cpp/src/Ice/Outgoing.cpp32
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp36
-rw-r--r--cpp/src/Ice/Proxy.cpp2
-rw-r--r--cpp/src/Ice/RetryQueue.cpp14
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp7
-rw-r--r--cpp/test/Ice/proxy/AllTests.cpp3
-rw-r--r--cpp/test/Ice/timeout/AllTests.cpp44
-rw-r--r--cs/src/Ice/AsyncResult.cs27
-rw-r--r--cs/src/Ice/CollocatedRequestHandler.cs6
-rw-r--r--cs/src/Ice/ConnectRequestHandler.cs6
-rw-r--r--cs/src/Ice/ConnectionI.cs64
-rw-r--r--cs/src/Ice/DefaultsAndOverrides.cs2
-rw-r--r--cs/src/Ice/OutgoingAsync.cs38
-rw-r--r--cs/src/Ice/Proxy.cs2
-rw-r--r--cs/src/Ice/RetryQueue.cs24
-rw-r--r--cs/test/Ice/proxy/AllTests.cs3
-rw-r--r--cs/test/Ice/timeout/AllTests.cs44
-rw-r--r--java/src/Ice/ConnectionI.java75
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java2
-rw-r--r--java/src/IceInternal/AsyncResultI.java26
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java6
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java6
-rw-r--r--java/src/IceInternal/DefaultsAndOverrides.java2
-rw-r--r--java/src/IceInternal/ProxyOutgoingAsyncBase.java29
-rw-r--r--java/src/IceInternal/RetryQueue.java2
-rw-r--r--java/src/IceInternal/RetryTask.java10
-rw-r--r--java/test/Ice/proxy/AllTests.java3
-rw-r--r--java/test/Ice/timeout/AllTests.java46
-rw-r--r--js/src/Ice/AsyncResult.js21
-rw-r--r--js/src/Ice/ConnectRequestHandler.js6
-rw-r--r--js/src/Ice/ConnectionI.js22
-rw-r--r--js/src/Ice/DefaultsAndOverrides.js8
-rw-r--r--js/src/Ice/OutgoingAsync.js8
-rw-r--r--js/src/Ice/RetryQueue.js9
41 files changed, 554 insertions, 262 deletions
diff --git a/cpp/include/Ice/AsyncResult.h b/cpp/include/Ice/AsyncResult.h
index 5452c5de970..c94d4f01a19 100644
--- a/cpp/include/Ice/AsyncResult.h
+++ b/cpp/include/Ice/AsyncResult.h
@@ -106,7 +106,7 @@ protected:
void invokeCompleted();
void cancel(const LocalException&);
- void cancelable(const IceInternal::CancellationHandlerPtr&);
+ virtual void cancelable(const IceInternal::CancellationHandlerPtr&);
void checkCanceled();
void warning(const std::exception&) const;
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index b00c97d68f9..8ea4b54e34c 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -92,6 +92,7 @@ public:
using OutgoingAsyncBase::sent;
virtual bool completed(const Ice::Exception&);
+ virtual void cancelable(const CancellationHandlerPtr&);
void retry();
void abort(const Ice::Exception&);
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp
index 03abc55e020..dc1b5689339 100644
--- a/cpp/src/Ice/AsyncResult.cpp
+++ b/cpp/src/Ice/AsyncResult.cpp
@@ -420,25 +420,20 @@ AsyncResult::cancel(const Ice::LocalException& ex)
void
AsyncResult::cancelable(const CancellationHandlerPtr& handler)
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(!_cancellationException.get())
- {
- _cancellationHandler = handler;
- return;
- }
- }
- handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get());
-}
-
-void
-AsyncResult::checkCanceled()
-{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
if(_cancellationException.get())
{
- _cancellationException->ice_throw();
+ try
+ {
+ _cancellationException->ice_throw();
+ }
+ catch(const Ice::Exception&)
+ {
+ _cancellationException.reset(0);
+ throw;
+ }
}
+ _cancellationHandler = handler;
}
void
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 3fc321735cb..f97fadf6c02 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -390,6 +390,9 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
if(_reference->getInvocationTimeout() > 0 || _response)
{
Lock sync(*this);
+
+ outAsync->cancelable(this); // This will throw if the request is canceled
+
if(_response)
{
requestId = ++_requestId;
@@ -399,7 +402,6 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
{
_sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
- outAsync->cancelable(this);
}
outAsync->attachCollocatedObserver(_adapter, requestId);
@@ -479,11 +481,11 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
+ outAsync->cancelable(this); // This will throw if the request is canceled
+
if(_reference->getInvocationTimeout() > 0)
{
_sendAsyncRequests.insert(make_pair(outAsync, 0));
-
- outAsync->cancelable(this);
}
assert(!_batchStream.b.empty());
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 7d7f4a9291c..4922d010a4e 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -229,6 +229,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
{
{
Lock sync(*this);
+ if(!_initialized)
+ {
+ out->cancelable(this); // This will throw if the request is canceled
+ }
+
try
{
if(!initialized())
@@ -236,7 +241,6 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
Request req;
req.outAsync = out;
_requests.push_back(req);
- out->cancelable(this);
return AsyncStatusQueued;
}
}
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 01df99084be..eb2a03cfc34 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -711,6 +711,12 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
//
_transceiver->checkSendSize(*os, _instance->messageSizeMax());
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ out->cancelable(this);
+
Int requestId = 0;
if(response)
{
@@ -750,11 +756,6 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_exception->ice_throw();
}
- if(response || status & AsyncStatusQueued)
- {
- out->cancelable(this); // Notify the request that it's cancelable
- }
-
if(response)
{
//
@@ -1135,6 +1136,12 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync)
}
//
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ outAsync->cancelable(this);
+
+ //
// Fill in the number of requests in the batch.
//
const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
@@ -1163,11 +1170,6 @@ Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync)
_exception->ice_throw();
}
- if(status & AsyncStatusQueued)
- {
- outAsync->cancelable(this); // Notify the request that it's cancelable.
- }
-
//
// Reset the batch stream.
//
@@ -1306,21 +1308,27 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
}
}
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
- o->canceled(true); // true = adopt the stream.
+ setState(StateClosed, ex);
}
else
{
- o->canceled(false);
- _sendStreams.erase(o);
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
+ {
+ o->canceled(true); // true = adopt the stream.
+ }
+ else
+ {
+ o->canceled(false);
+ _sendStreams.erase(o);
+ }
+ out->completed(ex);
}
-
- out->completed(ex);
return;
}
}
@@ -1330,9 +1338,17 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
{
if(_requestsHint != _requests.end() && _requestsHint->second == o)
{
- o->completed(ex);
- _requests.erase(_requestsHint);
- _requestsHint = _requests.end();
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
+ {
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ o->completed(ex);
+ _requests.erase(_requestsHint);
+ _requestsHint = _requests.end();
+ }
+ return;
}
else
{
@@ -1340,9 +1356,16 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
{
if(p->second == o)
{
- o->completed(ex);
- assert(p != _requestsHint);
- _requests.erase(p);
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
+ {
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ o->completed(ex);
+ assert(p != _requestsHint);
+ _requests.erase(p);
+ }
return; // We're done.
}
}
@@ -1381,23 +1404,29 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
}
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
- o->canceled(true); // true = adopt the stream
+ setState(StateClosed, ex);
}
else
{
- o->canceled(false);
- _sendStreams.erase(o);
- }
- if(outAsync->completed(ex))
- {
- sync.release();
- outAsync->invokeCompleted();
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
+ {
+ o->canceled(true); // true = adopt the stream
+ }
+ else
+ {
+ o->canceled(false);
+ _sendStreams.erase(o);
+ }
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
}
return;
}
@@ -1410,11 +1439,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
if(_asyncRequestsHint->second == o)
{
- _asyncRequests.erase(_asyncRequestsHint);
- _asyncRequestsHint = _asyncRequests.end();
- if(outAsync->completed(ex))
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
- outAsync->invokeCompletedAsync();
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
}
return;
}
@@ -1424,11 +1460,18 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
if(p->second.get() == o.get())
{
- assert(p != _asyncRequestsHint);
- _asyncRequests.erase(p);
- if(outAsync->completed(ex))
+ if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
- outAsync->invokeCompletedAsync();
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ assert(p != _asyncRequestsHint);
+ _asyncRequests.erase(p);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
}
return;
}
diff --git a/cpp/src/Ice/DefaultsAndOverrides.cpp b/cpp/src/Ice/DefaultsAndOverrides.cpp
index 03bed195abd..6ce5a0e9e35 100644
--- a/cpp/src/Ice/DefaultsAndOverrides.cpp
+++ b/cpp/src/Ice/DefaultsAndOverrides.cpp
@@ -138,7 +138,7 @@ IceInternal::DefaultsAndOverrides::DefaultsAndOverrides(const PropertiesPtr& pro
const_cast<int&>(defaultInvocationTimeout) =
properties->getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1);
- if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1)
+ if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1 && defaultInvocationTimeout != -2)
{
const_cast<Int&>(defaultInvocationTimeout) = -1;
Warning out(logger);
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 4815b9796fb..8e13a64dc45 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -208,12 +208,13 @@ Outgoing::invoke()
return true;
}
+ const int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
int cnt = 0;
while(true)
{
try
{
- if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now())
+ if(invocationTimeout > 0 && _invocationTimeoutDeadline <= IceUtil::Time::now())
{
throw Ice::InvocationTimeoutException(__FILE__, __LINE__);
}
@@ -228,11 +229,27 @@ Outgoing::invoke()
{
return true;
}
+
+ if(invocationTimeout == -2) // Use the connection timeout
+ {
+ try
+ {
+ _invocationTimeoutDeadline = IceUtil::Time(); // Reset any previously set value
+
+ int timeout = _handler->waitForConnection()->timeout();
+ if(timeout > 0)
+ {
+ _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(timeout);
+ }
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
bool timedOut = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
//
// If the handler says it's not finished, we wait until we're done.
//
@@ -262,7 +279,14 @@ Outgoing::invoke()
if(timedOut)
{
- _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__));
+ if(invocationTimeout == -2)
+ {
+ _handler->requestCanceled(this, ConnectionTimeoutException(__FILE__, __LINE__));
+ }
+ else
+ {
+ _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__));
+ }
//
// Wait for the exception to propagate. It's possible the request handler ignores
@@ -298,7 +322,7 @@ Outgoing::invoke()
interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt));
if(interval > IceUtil::Time())
{
- if(_invocationTimeoutDeadline != IceUtil::Time())
+ if(invocationTimeout > 0)
{
IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now();
if(deadline < interval)
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index c2c443f2184..fca1013252e 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -86,6 +86,12 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
_childObserver.detach();
}
+ _cachedConnection = 0;
+ if(_proxy->__reference()->getInvocationTimeout() == -2)
+ {
+ _instance->timer()->cancel(this);
+ }
+
//
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
@@ -101,6 +107,21 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
}
}
+
+void
+ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
+{
+ if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection)
+ {
+ const int timeout = _cachedConnection->timeout();
+ if(timeout > 0)
+ {
+ _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout));
+ }
+ }
+ AsyncResult::cancelable(handler);
+}
+
void
ProxyOutgoingAsyncBase::retry()
{
@@ -154,7 +175,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
}
else
{
- checkCanceled(); // Cancellation exception aren't retriable
_observer.retried();
}
@@ -204,7 +224,6 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
}
else
{
- checkCanceled(); // Cancellation exception aren't retriable
_observer.retried();
}
}
@@ -233,7 +252,7 @@ ProxyOutgoingAsyncBase::sent(bool done)
_sent = true;
if(done)
{
- if(_proxy->__reference()->getInvocationTimeout() > 0)
+ if(_proxy->__reference()->getInvocationTimeout() != -1)
{
_instance->timer()->cancel(this);
}
@@ -244,7 +263,7 @@ ProxyOutgoingAsyncBase::sent(bool done)
bool
ProxyOutgoingAsyncBase::finished(const Exception& ex)
{
- if(_proxy->__reference()->getInvocationTimeout() > 0)
+ if(_proxy->__reference()->getInvocationTimeout() != -1)
{
_instance->timer()->cancel(this);
}
@@ -254,7 +273,7 @@ ProxyOutgoingAsyncBase::finished(const Exception& ex)
bool
ProxyOutgoingAsyncBase::finished(bool ok)
{
- if(_proxy->__reference()->getInvocationTimeout() > 0)
+ if(_proxy->__reference()->getInvocationTimeout() != -1)
{
_instance->timer()->cancel(this);
}
@@ -276,12 +295,13 @@ ProxyOutgoingAsyncBase::handleException(const Exception& exc)
void
ProxyOutgoingAsyncBase::runTimerTask()
{
- try
+ if(_proxy->__reference()->getInvocationTimeout() == -2)
{
- cancel(InvocationTimeoutException(__FILE__, __LINE__));
+ cancel(ConnectionTimeoutException(__FILE__, __LINE__));
}
- catch(const CommunicatorDestroyedException&)
+ else
{
+ cancel(InvocationTimeoutException(__FILE__, __LINE__));
}
}
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 13bd0120a8f..d18a38a96e5 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -1163,7 +1163,7 @@ IceProxy::Ice::Object::ice_getInvocationTimeout() const
ObjectPrx
IceProxy::Ice::Object::ice_invocationTimeout(Int newTimeout) const
{
- if(newTimeout < 1 && newTimeout != -1)
+ if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2)
{
ostringstream s;
s << "invalid value passed to ice_invocationTimeout: " << newTimeout;
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp
index fe0e03d980f..063f413c80a 100644
--- a/cpp/src/Ice/RetryQueue.cpp
+++ b/cpp/src/Ice/RetryQueue.cpp
@@ -44,16 +44,14 @@ IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException
}
void
-IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&)
+IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex)
{
if(_queue->cancel(this))
{
- //
- // We just retry the outgoing async now rather than marking it
- // as finished. The retry will check for the cancellation
- // exception and terminate appropriately the request.
- //
- _outAsync->retry();
+ if(_outAsync->completed(ex))
+ {
+ _outAsync->invokeCompletedAsync();
+ }
}
}
@@ -89,6 +87,7 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval)
throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
RetryTaskPtr task = new RetryTask(this, out);
+ out->cancelable(task); // This will throw if the request is canceled.
try
{
_instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval));
@@ -98,7 +97,6 @@ IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval)
throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
_requests.insert(task);
- out->cancelable(task);
}
void
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index fffc66f7ec8..e08d85846a6 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -2672,12 +2672,15 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Ice::AsyncResultPtr r;
Ice::ByteSeq seq;
- seq.resize(1024); // Make sure the request doesn't compress too well.
+ seq.resize(10024); // Make sure the request doesn't compress too well.
for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
{
*q = static_cast<Ice::Byte>(IceUtilInternal::random(255));
}
- while((r = p->begin_opWithPayload(seq))->sentSynchronously());
+ for(int i = 0; i < 200; ++i) // 2MB
+ {
+ r = p->begin_opWithPayload(seq);
+ }
test(!r->isSent());
diff --git a/cpp/test/Ice/proxy/AllTests.cpp b/cpp/test/Ice/proxy/AllTests.cpp
index ce615ab82d8..b9371178371 100644
--- a/cpp/test/Ice/proxy/AllTests.cpp
+++ b/cpp/test/Ice/proxy/AllTests.cpp
@@ -501,6 +501,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
try
{
base->ice_invocationTimeout(-1);
+ base->ice_invocationTimeout(-2);
}
catch(const IceUtil::IllegalArgumentException&)
{
@@ -509,7 +510,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
try
{
- base->ice_invocationTimeout(-2);
+ base->ice_invocationTimeout(-3);
test(false);
}
catch(const IceUtil::IllegalArgumentException&)
diff --git a/cpp/test/Ice/timeout/AllTests.cpp b/cpp/test/Ice/timeout/AllTests.cpp
index c5be82c13d1..f3866356dac 100644
--- a/cpp/test/Ice/timeout/AllTests.cpp
+++ b/cpp/test/Ice/timeout/AllTests.cpp
@@ -209,6 +209,50 @@ allTests(const Ice::CommunicatorPtr& communicator)
to->begin_sleep(250, newCallback_Timeout_sleep(cb, &Callback::response, &Callback::exception));
cb->check();
}
+ {
+ //
+ // Backward compatible connection timeouts
+ //
+ TimeoutPrx to = TimeoutPrx::uncheckedCast(obj->ice_invocationTimeout(-2)->ice_timeout(250));
+ Ice::ConnectionPtr con;
+ try
+ {
+ con = to->ice_getConnection();
+ to->sleep(500);
+ test(false);
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ try
+ {
+ con->getInfo();
+ test(false);
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ // Connection got closed as well.
+ }
+ }
+
+ try
+ {
+ con = to->ice_getConnection();
+ to->end_sleep(to->begin_sleep(500));
+ test(false);
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ try
+ {
+ con->getInfo();
+ test(false);
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ // Connection got closed as well.
+ }
+ }
+ }
cout << "ok" << endl;
cout << "testing close timeout... " << flush;
diff --git a/cs/src/Ice/AsyncResult.cs b/cs/src/Ice/AsyncResult.cs
index e8db66e4b0d..e5b5e875684 100644
--- a/cs/src/Ice/AsyncResult.cs
+++ b/cs/src/Ice/AsyncResult.cs
@@ -404,17 +404,23 @@ namespace IceInternal
}, cachedConnection_);
}
- public void cancelable(CancellationHandler handler)
+ public virtual void cancelable(CancellationHandler handler)
{
lock(this)
{
- if(_cancellationException == null)
+ if(_cancellationException != null)
{
- _cancellationHandler = handler;
- return;
+ try
+ {
+ throw _cancellationException;
+ }
+ finally
+ {
+ _cancellationException = null;
+ }
}
+ _cancellationHandler = handler;
}
- handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException);
}
public bool wait()
@@ -631,17 +637,6 @@ namespace IceInternal
_cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
}
- protected void checkCanceled()
- {
- lock(this)
- {
- if(_cancellationException != null)
- {
- throw _cancellationException;
- }
- }
- }
-
protected virtual Ice.Instrumentation.InvocationObserver getObserver()
{
return observer_;
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs
index 632934a7d33..a0e1a354938 100644
--- a/cs/src/Ice/CollocatedRequestHandler.cs
+++ b/cs/src/Ice/CollocatedRequestHandler.cs
@@ -287,6 +287,8 @@ namespace IceInternal
{
lock(this)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_response)
{
requestId = ++_requestId;
@@ -296,7 +298,6 @@ namespace IceInternal
{
_sendAsyncRequests.Add(outAsync, requestId);
}
- outAsync.cancelable(this);
}
}
@@ -364,10 +365,11 @@ namespace IceInternal
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.Add(outAsync, 0);
- outAsync.cancelable(this);
}
Debug.Assert(!_batchStream.isEmpty());
diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs
index eed058479e7..5069903f90f 100644
--- a/cs/src/Ice/ConnectRequestHandler.cs
+++ b/cs/src/Ice/ConnectRequestHandler.cs
@@ -147,12 +147,16 @@ namespace IceInternal
{
lock(this)
{
+ if(!_initialized)
+ {
+ outAsync.cancelable(this); // This will throw if the request is canceled
+ }
+
try
{
if(!initialized())
{
_requests.AddLast(new Request(outAsync));
- outAsync.cancelable(this);
sentCallback = null;
return false;
}
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index 6b7ce6d7510..db509426f1a 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -408,6 +408,12 @@ namespace Ice
//
_transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ og.cancelable(this);
+
int requestId = 0;
if(response)
{
@@ -444,11 +450,6 @@ namespace Ice
throw _exception;
}
- if(response || !sent)
- {
- og.cancelable(this); // Notify the request that it's cancelable
- }
-
if(response)
{
//
@@ -717,6 +718,12 @@ namespace Ice
}
//
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ outAsync.cancelable(this);
+
+ //
// Fill in the number of requests in the batch.
//
_batchStream.pos(IceInternal.Protocol.headerSize);
@@ -743,11 +750,6 @@ namespace Ice
throw _exception;
}
- if(!sent)
- {
- outAsync.cancelable(this); // Notify the request that it's cancelable.
- }
-
//
// Reset the batch stream.
//
@@ -850,19 +852,26 @@ namespace Ice
_asyncRequests.Remove(o.requestId);
}
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- o.canceled();
- if(o != _sendStreams.First.Value)
+ if(ex is Ice.ConnectionTimeoutException)
{
- _sendStreams.Remove(p);
+ setState(StateClosed, ex);
}
- Ice.AsyncCallback cb = outAsync.completed(ex);
- if(cb != null)
+ else
{
- outAsync.invokeCompletedAsync(cb);
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ o.canceled();
+ if(o != _sendStreams.First.Value)
+ {
+ _sendStreams.Remove(p);
+ }
+ Ice.AsyncCallback cb = outAsync.completed(ex);
+ if(cb != null)
+ {
+ outAsync.invokeCompletedAsync(cb);
+ }
}
return;
}
@@ -875,11 +884,18 @@ namespace Ice
{
if(kvp.Value == o)
{
- _asyncRequests.Remove(kvp.Key);
- Ice.AsyncCallback cb = outAsync.completed(ex);
- if(cb != null)
+ if(ex is Ice.ConnectionTimeoutException)
{
- outAsync.invokeCompletedAsync(cb);
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ _asyncRequests.Remove(kvp.Key);
+ Ice.AsyncCallback cb = outAsync.completed(ex);
+ if(cb != null)
+ {
+ outAsync.invokeCompletedAsync(cb);
+ }
}
return;
}
diff --git a/cs/src/Ice/DefaultsAndOverrides.cs b/cs/src/Ice/DefaultsAndOverrides.cs
index f4dc56e6675..944410282f9 100644
--- a/cs/src/Ice/DefaultsAndOverrides.cs
+++ b/cs/src/Ice/DefaultsAndOverrides.cs
@@ -182,7 +182,7 @@ namespace IceInternal
}
defaultInvocationTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1);
- if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1)
+ if(defaultInvocationTimeout < 1 && defaultInvocationTimeout != -1 && defaultInvocationTimeout != -2)
{
defaultInvocationTimeout = -1;
StringBuilder msg = new StringBuilder("invalid value for Ice.Default.InvocationTimeout `");
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs
index 9ae3a55cd20..6da49814641 100644
--- a/cs/src/Ice/OutgoingAsync.cs
+++ b/cs/src/Ice/OutgoingAsync.cs
@@ -138,6 +138,12 @@ namespace IceInternal
childObserver_ = null;
}
+ cachedConnection_ = null;
+ if(proxy_.reference__().getInvocationTimeout() == -2)
+ {
+ instance_.timer().cancel(this);
+ }
+
//
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
@@ -153,6 +159,19 @@ namespace IceInternal
}
}
+ public override void cancelable(CancellationHandler handler)
+ {
+ if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null)
+ {
+ int timeout = cachedConnection_.timeout();
+ if(timeout > 0)
+ {
+ instance_.timer().schedule(this, timeout);
+ }
+ }
+ base.cancelable(handler);
+ }
+
public void retry()
{
invokeImpl(false);
@@ -179,7 +198,14 @@ namespace IceInternal
public void runTimerTask()
{
- cancel(new Ice.InvocationTimeoutException());
+ if(proxy_.reference__().getInvocationTimeout() == -2)
+ {
+ cancel(new Ice.ConnectionTimeoutException());
+ }
+ else
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
}
protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie) :
@@ -225,7 +251,6 @@ namespace IceInternal
}
else // If not called from the user thread, it's called from the retry queue
{
- checkCanceled(); // Cancellation exception aren't retriable
if(observer_ != null)
{
observer_.retried();
@@ -279,7 +304,6 @@ namespace IceInternal
}
else if(observer_ != null)
{
- checkCanceled();
observer_.retried();
}
}
@@ -308,7 +332,7 @@ namespace IceInternal
_sent = true;
if(done)
{
- if(proxy_.reference__().getInvocationTimeout() > 0)
+ if(proxy_.reference__().getInvocationTimeout() != -1)
{
instance_.timer().cancel(this);
}
@@ -318,7 +342,7 @@ namespace IceInternal
protected new Ice.AsyncCallback finished(Ice.Exception ex)
{
- if(proxy_.reference__().getInvocationTimeout() > 0)
+ if(proxy_.reference__().getInvocationTimeout() != -1)
{
instance_.timer().cancel(this);
}
@@ -327,7 +351,7 @@ namespace IceInternal
protected new Ice.AsyncCallback finished(bool ok)
{
- if(proxy_.reference__().getInvocationTimeout() > 0)
+ if(proxy_.reference__().getInvocationTimeout() != -1)
{
instance_.timer().cancel(this);
}
@@ -491,7 +515,7 @@ namespace IceInternal
public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB)
{
// The BasicStream cannot be cached if the proxy is not a twoway or there is an invocation timeout set.
- if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() > 0)
+ if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1)
{
// Disable caching by marking the streams as cached!
state_ |= StateCachedBuffers;
diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs
index c0650815d2b..c815004d284 100644
--- a/cs/src/Ice/Proxy.cs
+++ b/cs/src/Ice/Proxy.cs
@@ -1632,7 +1632,7 @@ namespace Ice
/// <param name="newTimeout">The new invocation timeout (in seconds).</param>
public ObjectPrx ice_invocationTimeout(int newTimeout)
{
- if(newTimeout < 1 && newTimeout != -1)
+ if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2)
{
throw new System.ArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout);
}
diff --git a/cs/src/Ice/RetryQueue.cs b/cs/src/Ice/RetryQueue.cs
index 4379ecdc0fd..c6a4aa82816 100644
--- a/cs/src/Ice/RetryQueue.cs
+++ b/cs/src/Ice/RetryQueue.cs
@@ -38,12 +38,11 @@ namespace IceInternal
Debug.Assert(_outAsync == outAsync);
if(_retryQueue.cancel(this))
{
- //
- // We just retry the outgoing async now rather than marking it
- // as finished. The retry will check for the cancellation
- // exception and terminate appropriately the request.
- //
- _outAsync.retry();
+ Ice.AsyncCallback cb = _outAsync.completed(ex);
+ if(cb != null)
+ {
+ _outAsync.invokeCompletedAsync(cb);
+ }
}
}
@@ -79,9 +78,9 @@ namespace IceInternal
throw new Ice.CommunicatorDestroyedException();
}
RetryTask task = new RetryTask(this, outAsync);
+ outAsync.cancelable(task); // This will throw if the request is canceled.
_instance.timer().schedule(task, interval);
_requests.Add(task, null);
- outAsync.cancelable(task);
}
}
@@ -114,12 +113,13 @@ namespace IceInternal
{
lock(this)
{
- Debug.Assert(_requests.ContainsKey(task));
- _requests.Remove(task);
- if(_instance == null && _requests.Count == 0)
+ if(_requests.Remove(task))
{
- // If we are destroying the queue, destroy is probably waiting on the queue to be empty.
- System.Threading.Monitor.Pulse(this);
+ if(_instance == null && _requests.Count == 0)
+ {
+ // If we are destroying the queue, destroy is probably waiting on the queue to be empty.
+ System.Threading.Monitor.Pulse(this);
+ }
}
}
}
diff --git a/cs/test/Ice/proxy/AllTests.cs b/cs/test/Ice/proxy/AllTests.cs
index 557b8398871..faf348a50d0 100644
--- a/cs/test/Ice/proxy/AllTests.cs
+++ b/cs/test/Ice/proxy/AllTests.cs
@@ -515,6 +515,7 @@ public class AllTests : TestCommon.TestApp
try
{
baseProxy.ice_invocationTimeout(-1);
+ baseProxy.ice_invocationTimeout(-2);
}
catch(System.ArgumentException)
{
@@ -523,7 +524,7 @@ public class AllTests : TestCommon.TestApp
try
{
- baseProxy.ice_invocationTimeout(-2);
+ baseProxy.ice_invocationTimeout(-3);
test(false);
}
catch(System.ArgumentException)
diff --git a/cs/test/Ice/timeout/AllTests.cs b/cs/test/Ice/timeout/AllTests.cs
index b8ebb722b44..61618a8b802 100644
--- a/cs/test/Ice/timeout/AllTests.cs
+++ b/cs/test/Ice/timeout/AllTests.cs
@@ -226,6 +226,50 @@ public class AllTests : TestCommon.TestApp
});
cb.check();
}
+ {
+ //
+ // Backward compatible connection timeouts
+ //
+ Test.TimeoutPrx to = Test.TimeoutPrxHelper.uncheckedCast(obj.ice_invocationTimeout(-2).ice_timeout(250));
+ Ice.Connection con = null;
+ try
+ {
+ con = to.ice_getConnection();
+ to.sleep(500);
+ test(false);
+ }
+ catch(Ice.TimeoutException)
+ {
+ try
+ {
+ con.getInfo();
+ test(false);
+ }
+ catch(Ice.TimeoutException)
+ {
+ // Connection got closed as well.
+ }
+ }
+
+ try
+ {
+ con = to.ice_getConnection();
+ to.end_sleep(to.begin_sleep(500));
+ test(false);
+ }
+ catch(Ice.TimeoutException)
+ {
+ try
+ {
+ con.getInfo();
+ test(false);
+ }
+ catch(Ice.TimeoutException)
+ {
+ // Connection got closed as well.
+ }
+ }
+ }
WriteLine("ok");
Write("testing close timeout... ");
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 3ed5e3eed93..c0b74d70bac 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -354,6 +354,12 @@ public final class ConnectionI extends IceInternal.EventHandler
//
_transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ out.cancelable(this);
+
int requestId = 0;
if(response)
{
@@ -388,11 +394,6 @@ public final class ConnectionI extends IceInternal.EventHandler
throw (Ice.LocalException) _exception.fillInStackTrace();
}
- if(response || (status & IceInternal.AsyncStatus.Queued) > 0)
- {
- out.cancelable(this); // Notify the request that it's cancelable
- }
-
if(response)
{
//
@@ -679,6 +680,12 @@ public final class ConnectionI extends IceInternal.EventHandler
}
//
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ outAsync.cancelable(this);
+
+ //
// Fill in the number of requests in the batch.
//
_batchStream.pos(IceInternal.Protocol.headerSize);
@@ -704,11 +711,6 @@ public final class ConnectionI extends IceInternal.EventHandler
throw (Ice.LocalException) _exception.fillInStackTrace();
}
- if((status & IceInternal.AsyncStatus.Queued) > 0)
- {
- outAsync.cancelable(this); // Notify the request that it's cancelable.
- }
-
//
// Reset the batch stream.
//
@@ -808,22 +810,29 @@ public final class ConnectionI extends IceInternal.EventHandler
_asyncRequests.remove(o.requestId);
}
- //
- // If the request is being sent, don't remove it from the send
- // streams, it will be removed once the sending is finished.
- //
- // Note that since we swapped the message stream to _writeStream
- // it's fine if the OutgoingAsync output stream is released (and
- // as long as canceled requests cannot be retried).
- //
- o.canceled();
- if(o != _sendStreams.getFirst())
+ if(ex instanceof ConnectionTimeoutException)
{
- it.remove();
+ setState(StateClosed, ex);
}
- if(outAsync.completed(ex))
+ else
{
- outAsync.invokeCompletedAsync();
+ //
+ // If the request is being sent, don't remove it from the send
+ // streams, it will be removed once the sending is finished.
+ //
+ // Note that since we swapped the message stream to _writeStream
+ // it's fine if the OutgoingAsync output stream is released (and
+ // as long as canceled requests cannot be retried).
+ //
+ o.canceled();
+ if(o != _sendStreams.getFirst())
+ {
+ it.remove();
+ }
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
return;
}
@@ -837,11 +846,19 @@ public final class ConnectionI extends IceInternal.EventHandler
{
if(it2.next() == o)
{
- it2.remove();
- if(outAsync.completed(ex))
+ if(ex instanceof ConnectionTimeoutException)
{
- outAsync.invokeCompletedAsync();
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ it2.remove();
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
+ return;
}
}
}
@@ -1770,7 +1787,8 @@ public final class ConnectionI extends IceInternal.EventHandler
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
warning("connection exception", _exception);
}
@@ -1957,7 +1975,8 @@ public final class ConnectionI extends IceInternal.EventHandler
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
_observer.failed(_exception.ice_name());
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 3f7bb25d9ad..2c763f56cc2 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -1757,7 +1757,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final ObjectPrx
ice_invocationTimeout(int newTimeout)
{
- if(newTimeout < 1 && newTimeout != -1)
+ if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2)
{
throw new IllegalArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout);
}
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
index 9d7445584fe..6c31dfb8e38 100644
--- a/java/src/IceInternal/AsyncResultI.java
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -228,17 +228,20 @@ public class AsyncResultI implements AsyncResult
});
}
- public void cancelable(final CancellationHandler handler)
+ synchronized public void cancelable(final CancellationHandler handler)
{
- synchronized(this)
+ if(_cancellationException != null)
{
- if(_cancellationException == null)
+ try
{
- _cancellationHandler = handler;
- return;
+ throw _cancellationException;
+ }
+ finally
+ {
+ _cancellationException = null;
}
}
- handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException);
+ _cancellationHandler = handler;
}
public final boolean __wait()
@@ -408,17 +411,6 @@ public class AsyncResultI implements AsyncResult
_cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
}
- protected void checkCanceled()
- {
- synchronized(this)
- {
- if(_cancellationException != null)
- {
- throw _cancellationException;
- }
- }
- }
-
protected Ice.Instrumentation.InvocationObserver getObserver()
{
return _observer;
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 48d7bfa5b7d..e3f5f045f58 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -311,6 +311,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
synchronized(this)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_response)
{
requestId = ++_requestId;
@@ -320,7 +322,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_sendAsyncRequests.put(outAsync, requestId);
}
- outAsync.cancelable(this);
}
}
@@ -366,10 +367,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, 0);
- outAsync.cancelable(this);
}
assert(!_batchStream.isEmpty());
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index e4c1e6b1727..a5f92ac090c 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -154,12 +154,16 @@ public class ConnectRequestHandler
{
synchronized(this)
{
+ if(!_initialized)
+ {
+ out.cancelable(this); // This will throw if the request is canceled
+ }
+
try
{
if(!initialized())
{
_requests.add(new Request(out));
- out.cancelable(this);
return AsyncStatus.Queued;
}
}
diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java
index 5dbdcc1a0dc..916ca3b899d 100644
--- a/java/src/IceInternal/DefaultsAndOverrides.java
+++ b/java/src/IceInternal/DefaultsAndOverrides.java
@@ -193,7 +193,7 @@ public final class DefaultsAndOverrides
}
intValue = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1);
- if(intValue < 1 && intValue != -1)
+ if(intValue < 1 && intValue != -1 && intValue != -2)
{
defaultInvocationTimeout = -1;
StringBuffer msg = new StringBuffer("invalid value for Ice.Default.InvocationTimeout `");
diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
index 0441552fb45..29779078111 100644
--- a/java/src/IceInternal/ProxyOutgoingAsyncBase.java
+++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
@@ -66,6 +66,27 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
invokeImpl(false);
}
+ public void cancelable(final CancellationHandler handler)
+ {
+ if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null)
+ {
+ final int timeout = _cachedConnection.timeout();
+ if(timeout > 0)
+ {
+ _future = _instance.timer().schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cancel(new Ice.ConnectionTimeoutException());
+ }
+ }, timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ }
+ super.cancelable(handler);
+ }
+
public void abort(Ice.Exception ex)
{
assert(_childObserver == null);
@@ -128,15 +149,14 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
{
@Override
public void run()
- {
- cancel(new Ice.InvocationTimeoutException());
- }
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
}, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
}
}
else // If not called from the user thread, it's called from the retry queue
{
- checkCanceled(); // Cancellation exception aren't retriable
if(_observer != null)
{
_observer.retried();
@@ -190,7 +210,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
else if(_observer != null)
{
- checkCanceled();
_observer.retried();
}
}
diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java
index 82831529545..28c10b7bd48 100644
--- a/java/src/IceInternal/RetryQueue.java
+++ b/java/src/IceInternal/RetryQueue.java
@@ -23,9 +23,9 @@ public class RetryQueue
throw new Ice.CommunicatorDestroyedException();
}
RetryTask task = new RetryTask(this, outAsync);
+ outAsync.cancelable(task); // This will throw if the request is canceled
task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS));
_requests.add(task);
- outAsync.cancelable(task);
}
synchronized public void destroy()
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
index 974dc998a79..b847dc69f33 100644
--- a/java/src/IceInternal/RetryTask.java
+++ b/java/src/IceInternal/RetryTask.java
@@ -36,12 +36,10 @@ class RetryTask implements Runnable, CancellationHandler
{
if(_queue.remove(this) && _future.cancel(false))
{
- //
- // We just retry the outgoing async now rather than marking it
- // as finished. The retry will check for the cancellation
- // exception and terminate appropriately the request.
- //
- _outAsync.retry();
+ if(_outAsync.completed(ex))
+ {
+ _outAsync.invokeCompletedAsync();
+ }
}
}
diff --git a/java/test/Ice/proxy/AllTests.java b/java/test/Ice/proxy/AllTests.java
index 45dfbe901ed..98e3c124fbb 100644
--- a/java/test/Ice/proxy/AllTests.java
+++ b/java/test/Ice/proxy/AllTests.java
@@ -519,6 +519,7 @@ public class AllTests
try
{
base.ice_invocationTimeout(-1);
+ base.ice_invocationTimeout(-2);
}
catch(IllegalArgumentException e)
{
@@ -527,7 +528,7 @@ public class AllTests
try
{
- base.ice_invocationTimeout(-2);
+ base.ice_invocationTimeout(-3);
test(false);
}
catch(IllegalArgumentException e)
diff --git a/java/test/Ice/timeout/AllTests.java b/java/test/Ice/timeout/AllTests.java
index 46dd8e00452..7f0239f32c5 100644
--- a/java/test/Ice/timeout/AllTests.java
+++ b/java/test/Ice/timeout/AllTests.java
@@ -272,6 +272,52 @@ public class AllTests
to.begin_sleep(250 * mult, cb);
cb.check();
}
+ {
+ //
+ // Backward compatible connection timeouts
+ //
+ TimeoutPrx to = TimeoutPrxHelper.uncheckedCast(obj.ice_invocationTimeout(-2).ice_timeout(250));
+ Ice.Connection con = null;
+ try
+ {
+ con = to.ice_getConnection();
+ to.sleep(500);
+ test(false);
+ }
+ catch(Ice.TimeoutException ex)
+ {
+ assert(con != null);
+ try
+ {
+ con.getInfo();
+ test(false);
+ }
+ catch(Ice.TimeoutException exc)
+ {
+ // Connection got closed as well.
+ }
+ }
+
+ try
+ {
+ con = to.ice_getConnection();
+ to.end_sleep(to.begin_sleep(500));
+ test(false);
+ }
+ catch(Ice.TimeoutException ex)
+ {
+ assert(con != null);
+ try
+ {
+ con.getInfo();
+ test(false);
+ }
+ catch(Ice.TimeoutException exc)
+ {
+ // Connection got closed as well.
+ }
+ }
+ }
out.println("ok");
out.print("testing close timeout... ");
diff --git a/js/src/Ice/AsyncResult.js b/js/src/Ice/AsyncResult.js
index 5564803f64c..082b73b689a 100644
--- a/js/src/Ice/AsyncResult.js
+++ b/js/src/Ice/AsyncResult.js
@@ -118,19 +118,16 @@ var AsyncResult = Ice.Class(AsyncResultBase, {
{
if(this._cancellationException)
{
- handler.asyncRequestCanceled(this, this._cancellationException);
- }
- else
- {
- this._cancellationHandler = handler;
- }
- },
- __checkCanceled: function()
- {
- if(this._cancellationException)
- {
- throw this._cancellationException;
+ try
+ {
+ throw this._cancellationException;
+ }
+ finally
+ {
+ this._cancellationException = null;
+ }
}
+ this._cancellationHandler = handler;
},
__os: function()
{
diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js
index 602c71eeffe..3e7e5a559cc 100644
--- a/js/src/Ice/ConnectRequestHandler.js
+++ b/js/src/Ice/ConnectRequestHandler.js
@@ -155,12 +155,16 @@ var ConnectRequestHandler = Ice.Class({
},
sendAsyncRequest: function(out)
{
+ if(!this._initialized)
+ {
+ out.__cancelable(this); // This will throw if the request is canceled
+ }
+
try
{
if(!this.initialized())
{
this._requests.push(new Request(out));
- out.__cancelable(this);
return AsyncStatus.Queued;
}
}
diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js
index 28b2e60352a..d14417c7412 100644
--- a/js/src/Ice/ConnectionI.js
+++ b/js/src/Ice/ConnectionI.js
@@ -399,6 +399,12 @@ var ConnectionI = Class({
//
this._transceiver.checkSendSize(os, this._instance.messageSizeMax());
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ out.__cancelable(this); // Notify the request that it's cancelable
+
if(response)
{
//
@@ -437,11 +443,6 @@ var ConnectionI = Class({
}
}
- if(response || (status & AsyncStatus.Queued) > 0)
- {
- out.__cancelable(this); // Notify the request that it's cancelable
- }
-
if(response)
{
//
@@ -665,6 +666,12 @@ var ConnectionI = Class({
}
//
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ outAsync.__cancelable(this);
+
+ //
// Fill in the number of requests in the batch.
//
this._batchStream.pos = Protocol.headerSize;
@@ -690,11 +697,6 @@ var ConnectionI = Class({
}
}
- if((status & AsyncStatus.Queued) > 0)
- {
- outAsync.__cancelable(this); // Notify the request that it's cancelable.
- }
-
//
// Reset the batch stream.
//
diff --git a/js/src/Ice/DefaultsAndOverrides.js b/js/src/Ice/DefaultsAndOverrides.js
index 8f937dfec14..4009d6e9f8c 100644
--- a/js/src/Ice/DefaultsAndOverrides.js
+++ b/js/src/Ice/DefaultsAndOverrides.js
@@ -110,7 +110,7 @@ var DefaultsAndOverrides = function(properties, logger)
{
this.defaultTimeout = 60000;
logger.warning("invalid value for Ice.Default.Timeout `" + properties.getProperty("Ice.Default.Timeout") +
- "': defaulting to 60000");
+ "': defaulting to 60000");
}
this.defaultLocatorCacheTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.LocatorCacheTimeout", -1);
@@ -118,7 +118,7 @@ var DefaultsAndOverrides = function(properties, logger)
{
this.defaultLocatorCacheTimeout = -1;
logger.warning("invalid value for Ice.Default.LocatorCacheTimeout `" +
- properties.getProperty("Ice.Default.LocatorCacheTimeout") + "': defaulting to -1");
+ properties.getProperty("Ice.Default.LocatorCacheTimeout") + "': defaulting to -1");
}
this.defaultInvocationTimeout = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1);
@@ -126,13 +126,13 @@ var DefaultsAndOverrides = function(properties, logger)
{
this.defaultInvocationTimeout = -1;
logger.warning("invalid value for Ice.Default.InvocationTimeout `" +
- properties.getProperty("Ice.Default.InvocationTimeout") + "': defaulting to -1");
+ properties.getProperty("Ice.Default.InvocationTimeout") + "': defaulting to -1");
}
this.defaultPreferSecure = properties.getPropertyAsIntWithDefault("Ice.Default.PreferSecure", 0) > 0;
value = properties.getPropertyWithDefault("Ice.Default.EncodingVersion",
- Ice.encodingVersionToString(Protocol.currentEncoding));
+ Ice.encodingVersionToString(Protocol.currentEncoding));
this.defaultEncoding = Ice.stringToEncodingVersion(value);
Protocol.checkSupportedEncoding(this.defaultEncoding);
diff --git a/js/src/Ice/OutgoingAsync.js b/js/src/Ice/OutgoingAsync.js
index 175258d69ec..cb83b4578d7 100644
--- a/js/src/Ice/OutgoingAsync.js
+++ b/js/src/Ice/OutgoingAsync.js
@@ -116,10 +116,6 @@ var ProxyOutgoingAsyncBase = Ice.Class(OutgoingAsyncBase, {
invocationTimeout);
}
}
- else
- {
- this.__checkCanceled();
- }
while(true)
{
@@ -151,10 +147,6 @@ var ProxyOutgoingAsyncBase = Ice.Class(OutgoingAsyncBase, {
this._instance.retryQueue().add(this, interval);
return;
}
- else
- {
- this.__checkCanceled(); // Cancellation exception aren't retriable
- }
}
}
}
diff --git a/js/src/Ice/RetryQueue.js b/js/src/Ice/RetryQueue.js
index 86148cd0757..e346a4e0efb 100644
--- a/js/src/Ice/RetryQueue.js
+++ b/js/src/Ice/RetryQueue.js
@@ -25,11 +25,11 @@ var RetryQueue = Class({
throw new Ice.CommunicatorDestroyedException();
}
var task = new RetryTask(this, outAsync);
+ outAsync.__cancelable(task); // This will throw if the request is canceled
task.token = this._instance.timer().schedule(function()
{
task.run();
}, interval);
- outAsync.__cancelable(task);
this._requests.push(task);
},
destroy: function()
@@ -90,12 +90,7 @@ var RetryTask = Class({
{
if(this.queue.cancel(this))
{
- //
- // We just retry the outgoing async now rather than marking it
- // as finished. The retry will check for the cancellation
- // exception and terminate appropriately the request.
- //
- this.outAsync.__retry();
+ this.outAsync.__completedEx(ex);
}
}
});