summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-15 17:26:45 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-15 17:26:45 +0200
commitf94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2 (patch)
tree6b12ef2c59421702743048393f4757c0d1e0c504 /java/src
parentICE-5732 missing tracing in throughput demo (diff)
downloadice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.bz2
ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.xz
ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.zip
Fixed ICE-5666: setting the invocation timeout to -2 provides the previous connection timeouts
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ConnectionI.java75
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java2
-rw-r--r--java/src/IceInternal/AsyncResultI.java26
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java6
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java6
-rw-r--r--java/src/IceInternal/DefaultsAndOverrides.java2
-rw-r--r--java/src/IceInternal/ProxyOutgoingAsyncBase.java29
-rw-r--r--java/src/IceInternal/RetryQueue.java2
-rw-r--r--java/src/IceInternal/RetryTask.java10
9 files changed, 96 insertions, 62 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 3ed5e3eed93..c0b74d70bac 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -354,6 +354,12 @@ public final class ConnectionI extends IceInternal.EventHandler
//
_transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ out.cancelable(this);
+
int requestId = 0;
if(response)
{
@@ -388,11 +394,6 @@ public final class ConnectionI extends IceInternal.EventHandler
throw (Ice.LocalException) _exception.fillInStackTrace();
}
- if(response || (status & IceInternal.AsyncStatus.Queued) > 0)
- {
- out.cancelable(this); // Notify the request that it's cancelable
- }
-
if(response)
{
//
@@ -679,6 +680,12 @@ public final class ConnectionI extends IceInternal.EventHandler
}
//
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ outAsync.cancelable(this);
+
+ //
// Fill in the number of requests in the batch.
//
_batchStream.pos(IceInternal.Protocol.headerSize);
@@ -704,11 +711,6 @@ public final class ConnectionI extends IceInternal.EventHandler
throw (Ice.LocalException) _exception.fillInStackTrace();
}
- if((status & IceInternal.AsyncStatus.Queued) > 0)
- {
- outAsync.cancelable(this); // Notify the request that it's cancelable.
- }
-
//
// Reset the batch stream.
//
@@ -808,22 +810,29 @@ public final class ConnectionI extends IceInternal.EventHandler
_asyncRequests.remove(o.requestId);
}
- //
- // If the request is being sent, don't remove it from the send
- // streams, it will be removed once the sending is finished.
- //
- // Note that since we swapped the message stream to _writeStream
- // it's fine if the OutgoingAsync output stream is released (and
- // as long as canceled requests cannot be retried).
- //
- o.canceled();
- if(o != _sendStreams.getFirst())
+ if(ex instanceof ConnectionTimeoutException)
{
- it.remove();
+ setState(StateClosed, ex);
}
- if(outAsync.completed(ex))
+ else
{
- outAsync.invokeCompletedAsync();
+ //
+ // If the request is being sent, don't remove it from the send
+ // streams, it will be removed once the sending is finished.
+ //
+ // Note that since we swapped the message stream to _writeStream
+ // it's fine if the OutgoingAsync output stream is released (and
+ // as long as canceled requests cannot be retried).
+ //
+ o.canceled();
+ if(o != _sendStreams.getFirst())
+ {
+ it.remove();
+ }
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
return;
}
@@ -837,11 +846,19 @@ public final class ConnectionI extends IceInternal.EventHandler
{
if(it2.next() == o)
{
- it2.remove();
- if(outAsync.completed(ex))
+ if(ex instanceof ConnectionTimeoutException)
{
- outAsync.invokeCompletedAsync();
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ it2.remove();
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
+ return;
}
}
}
@@ -1770,7 +1787,8 @@ public final class ConnectionI extends IceInternal.EventHandler
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
warning("connection exception", _exception);
}
@@ -1957,7 +1975,8 @@ public final class ConnectionI extends IceInternal.EventHandler
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
_observer.failed(_exception.ice_name());
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 3f7bb25d9ad..2c763f56cc2 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -1757,7 +1757,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final ObjectPrx
ice_invocationTimeout(int newTimeout)
{
- if(newTimeout < 1 && newTimeout != -1)
+ if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2)
{
throw new IllegalArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout);
}
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
index 9d7445584fe..6c31dfb8e38 100644
--- a/java/src/IceInternal/AsyncResultI.java
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -228,17 +228,20 @@ public class AsyncResultI implements AsyncResult
});
}
- public void cancelable(final CancellationHandler handler)
+ synchronized public void cancelable(final CancellationHandler handler)
{
- synchronized(this)
+ if(_cancellationException != null)
{
- if(_cancellationException == null)
+ try
{
- _cancellationHandler = handler;
- return;
+ throw _cancellationException;
+ }
+ finally
+ {
+ _cancellationException = null;
}
}
- handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException);
+ _cancellationHandler = handler;
}
public final boolean __wait()
@@ -408,17 +411,6 @@ public class AsyncResultI implements AsyncResult
_cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
}
- protected void checkCanceled()
- {
- synchronized(this)
- {
- if(_cancellationException != null)
- {
- throw _cancellationException;
- }
- }
- }
-
protected Ice.Instrumentation.InvocationObserver getObserver()
{
return _observer;
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 48d7bfa5b7d..e3f5f045f58 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -311,6 +311,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
synchronized(this)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_response)
{
requestId = ++_requestId;
@@ -320,7 +322,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_sendAsyncRequests.put(outAsync, requestId);
}
- outAsync.cancelable(this);
}
}
@@ -366,10 +367,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, 0);
- outAsync.cancelable(this);
}
assert(!_batchStream.isEmpty());
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index e4c1e6b1727..a5f92ac090c 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -154,12 +154,16 @@ public class ConnectRequestHandler
{
synchronized(this)
{
+ if(!_initialized)
+ {
+ out.cancelable(this); // This will throw if the request is canceled
+ }
+
try
{
if(!initialized())
{
_requests.add(new Request(out));
- out.cancelable(this);
return AsyncStatus.Queued;
}
}
diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java
index 5dbdcc1a0dc..916ca3b899d 100644
--- a/java/src/IceInternal/DefaultsAndOverrides.java
+++ b/java/src/IceInternal/DefaultsAndOverrides.java
@@ -193,7 +193,7 @@ public final class DefaultsAndOverrides
}
intValue = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1);
- if(intValue < 1 && intValue != -1)
+ if(intValue < 1 && intValue != -1 && intValue != -2)
{
defaultInvocationTimeout = -1;
StringBuffer msg = new StringBuffer("invalid value for Ice.Default.InvocationTimeout `");
diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
index 0441552fb45..29779078111 100644
--- a/java/src/IceInternal/ProxyOutgoingAsyncBase.java
+++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
@@ -66,6 +66,27 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
invokeImpl(false);
}
+ public void cancelable(final CancellationHandler handler)
+ {
+ if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null)
+ {
+ final int timeout = _cachedConnection.timeout();
+ if(timeout > 0)
+ {
+ _future = _instance.timer().schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cancel(new Ice.ConnectionTimeoutException());
+ }
+ }, timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ }
+ super.cancelable(handler);
+ }
+
public void abort(Ice.Exception ex)
{
assert(_childObserver == null);
@@ -128,15 +149,14 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
{
@Override
public void run()
- {
- cancel(new Ice.InvocationTimeoutException());
- }
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
}, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
}
}
else // If not called from the user thread, it's called from the retry queue
{
- checkCanceled(); // Cancellation exception aren't retriable
if(_observer != null)
{
_observer.retried();
@@ -190,7 +210,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
else if(_observer != null)
{
- checkCanceled();
_observer.retried();
}
}
diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java
index 82831529545..28c10b7bd48 100644
--- a/java/src/IceInternal/RetryQueue.java
+++ b/java/src/IceInternal/RetryQueue.java
@@ -23,9 +23,9 @@ public class RetryQueue
throw new Ice.CommunicatorDestroyedException();
}
RetryTask task = new RetryTask(this, outAsync);
+ outAsync.cancelable(task); // This will throw if the request is canceled
task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS));
_requests.add(task);
- outAsync.cancelable(task);
}
synchronized public void destroy()
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
index 974dc998a79..b847dc69f33 100644
--- a/java/src/IceInternal/RetryTask.java
+++ b/java/src/IceInternal/RetryTask.java
@@ -36,12 +36,10 @@ class RetryTask implements Runnable, CancellationHandler
{
if(_queue.remove(this) && _future.cancel(false))
{
- //
- // We just retry the outgoing async now rather than marking it
- // as finished. The retry will check for the cancellation
- // exception and terminate appropriately the request.
- //
- _outAsync.retry();
+ if(_outAsync.completed(ex))
+ {
+ _outAsync.invokeCompletedAsync();
+ }
}
}