diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 17:26:45 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 17:26:45 +0200 |
commit | f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2 (patch) | |
tree | 6b12ef2c59421702743048393f4757c0d1e0c504 /java | |
parent | ICE-5732 missing tracing in throughput demo (diff) | |
download | ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.bz2 ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.tar.xz ice-f94eb5f938d33dc2ce9b09b03b5dc6ccf7bd46c2.zip |
Fixed ICE-5666: setting the invocation timeout to -2 provides the previous connection timeouts
Diffstat (limited to 'java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 75 | ||||
-rw-r--r-- | java/src/Ice/ObjectPrxHelperBase.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/AsyncResultI.java | 26 | ||||
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/DefaultsAndOverrides.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/ProxyOutgoingAsyncBase.java | 29 | ||||
-rw-r--r-- | java/src/IceInternal/RetryQueue.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/RetryTask.java | 10 | ||||
-rw-r--r-- | java/test/Ice/proxy/AllTests.java | 3 | ||||
-rw-r--r-- | java/test/Ice/timeout/AllTests.java | 46 |
11 files changed, 144 insertions, 63 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 3ed5e3eed93..c0b74d70bac 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -354,6 +354,12 @@ public final class ConnectionI extends IceInternal.EventHandler // _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + out.cancelable(this); + int requestId = 0; if(response) { @@ -388,11 +394,6 @@ public final class ConnectionI extends IceInternal.EventHandler throw (Ice.LocalException) _exception.fillInStackTrace(); } - if(response || (status & IceInternal.AsyncStatus.Queued) > 0) - { - out.cancelable(this); // Notify the request that it's cancelable - } - if(response) { // @@ -679,6 +680,12 @@ public final class ConnectionI extends IceInternal.EventHandler } // + // Notify the request that it's cancelable with this connection. + // This will throw if the request is canceled. + // + outAsync.cancelable(this); + + // // Fill in the number of requests in the batch. // _batchStream.pos(IceInternal.Protocol.headerSize); @@ -704,11 +711,6 @@ public final class ConnectionI extends IceInternal.EventHandler throw (Ice.LocalException) _exception.fillInStackTrace(); } - if((status & IceInternal.AsyncStatus.Queued) > 0) - { - outAsync.cancelable(this); // Notify the request that it's cancelable. - } - // // Reset the batch stream. // @@ -808,22 +810,29 @@ public final class ConnectionI extends IceInternal.EventHandler _asyncRequests.remove(o.requestId); } - // - // If the request is being sent, don't remove it from the send - // streams, it will be removed once the sending is finished. - // - // Note that since we swapped the message stream to _writeStream - // it's fine if the OutgoingAsync output stream is released (and - // as long as canceled requests cannot be retried). - // - o.canceled(); - if(o != _sendStreams.getFirst()) + if(ex instanceof ConnectionTimeoutException) { - it.remove(); + setState(StateClosed, ex); } - if(outAsync.completed(ex)) + else { - outAsync.invokeCompletedAsync(); + // + // If the request is being sent, don't remove it from the send + // streams, it will be removed once the sending is finished. + // + // Note that since we swapped the message stream to _writeStream + // it's fine if the OutgoingAsync output stream is released (and + // as long as canceled requests cannot be retried). + // + o.canceled(); + if(o != _sendStreams.getFirst()) + { + it.remove(); + } + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } return; } @@ -837,11 +846,19 @@ public final class ConnectionI extends IceInternal.EventHandler { if(it2.next() == o) { - it2.remove(); - if(outAsync.completed(ex)) + if(ex instanceof ConnectionTimeoutException) { - outAsync.invokeCompletedAsync(); + setState(StateClosed, ex); + } + else + { + it2.remove(); + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } + return; } } } @@ -1770,7 +1787,8 @@ public final class ConnectionI extends IceInternal.EventHandler _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -1957,7 +1975,8 @@ public final class ConnectionI extends IceInternal.EventHandler _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 3f7bb25d9ad..2c763f56cc2 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -1757,7 +1757,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final ObjectPrx ice_invocationTimeout(int newTimeout) { - if(newTimeout < 1 && newTimeout != -1) + if(newTimeout < 1 && newTimeout != -1 && newTimeout != -2) { throw new IllegalArgumentException("invalid value passed to ice_invocationTimeout: " + newTimeout); } diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java index 9d7445584fe..6c31dfb8e38 100644 --- a/java/src/IceInternal/AsyncResultI.java +++ b/java/src/IceInternal/AsyncResultI.java @@ -228,17 +228,20 @@ public class AsyncResultI implements AsyncResult }); } - public void cancelable(final CancellationHandler handler) + synchronized public void cancelable(final CancellationHandler handler) { - synchronized(this) + if(_cancellationException != null) { - if(_cancellationException == null) + try { - _cancellationHandler = handler; - return; + throw _cancellationException; + } + finally + { + _cancellationException = null; } } - handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException); + _cancellationHandler = handler; } public final boolean __wait() @@ -408,17 +411,6 @@ public class AsyncResultI implements AsyncResult _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex); } - protected void checkCanceled() - { - synchronized(this) - { - if(_cancellationException != null) - { - throw _cancellationException; - } - } - } - protected Ice.Instrumentation.InvocationObserver getObserver() { return _observer; diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 48d7bfa5b7d..e3f5f045f58 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -311,6 +311,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { synchronized(this) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_response) { requestId = ++_requestId; @@ -320,7 +322,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _sendAsyncRequests.put(outAsync, requestId); } - outAsync.cancelable(this); } } @@ -366,10 +367,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { + outAsync.cancelable(this); // This will throw if the request is canceled + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, 0); - outAsync.cancelable(this); } assert(!_batchStream.isEmpty()); diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index e4c1e6b1727..a5f92ac090c 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -154,12 +154,16 @@ public class ConnectRequestHandler { synchronized(this) { + if(!_initialized) + { + out.cancelable(this); // This will throw if the request is canceled + } + try { if(!initialized()) { _requests.add(new Request(out)); - out.cancelable(this); return AsyncStatus.Queued; } } diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java index 5dbdcc1a0dc..916ca3b899d 100644 --- a/java/src/IceInternal/DefaultsAndOverrides.java +++ b/java/src/IceInternal/DefaultsAndOverrides.java @@ -193,7 +193,7 @@ public final class DefaultsAndOverrides } intValue = properties.getPropertyAsIntWithDefault("Ice.Default.InvocationTimeout", -1); - if(intValue < 1 && intValue != -1) + if(intValue < 1 && intValue != -1 && intValue != -2) { defaultInvocationTimeout = -1; StringBuffer msg = new StringBuffer("invalid value for Ice.Default.InvocationTimeout `"); diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java index 0441552fb45..29779078111 100644 --- a/java/src/IceInternal/ProxyOutgoingAsyncBase.java +++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java @@ -66,6 +66,27 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase invokeImpl(false); } + public void cancelable(final CancellationHandler handler) + { + if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null) + { + final int timeout = _cachedConnection.timeout(); + if(timeout > 0) + { + _future = _instance.timer().schedule( + new Runnable() + { + @Override + public void run() + { + cancel(new Ice.ConnectionTimeoutException()); + } + }, timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + } + } + super.cancelable(handler); + } + public void abort(Ice.Exception ex) { assert(_childObserver == null); @@ -128,15 +149,14 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase { @Override public void run() - { - cancel(new Ice.InvocationTimeoutException()); - } + { + cancel(new Ice.InvocationTimeoutException()); + } }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); } } else // If not called from the user thread, it's called from the retry queue { - checkCanceled(); // Cancellation exception aren't retriable if(_observer != null) { _observer.retried(); @@ -190,7 +210,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } else if(_observer != null) { - checkCanceled(); _observer.retried(); } } diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java index 82831529545..28c10b7bd48 100644 --- a/java/src/IceInternal/RetryQueue.java +++ b/java/src/IceInternal/RetryQueue.java @@ -23,9 +23,9 @@ public class RetryQueue throw new Ice.CommunicatorDestroyedException(); } RetryTask task = new RetryTask(this, outAsync); + outAsync.cancelable(task); // This will throw if the request is canceled task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS)); _requests.add(task); - outAsync.cancelable(task); } synchronized public void destroy() diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java index 974dc998a79..b847dc69f33 100644 --- a/java/src/IceInternal/RetryTask.java +++ b/java/src/IceInternal/RetryTask.java @@ -36,12 +36,10 @@ class RetryTask implements Runnable, CancellationHandler { if(_queue.remove(this) && _future.cancel(false)) { - // - // We just retry the outgoing async now rather than marking it - // as finished. The retry will check for the cancellation - // exception and terminate appropriately the request. - // - _outAsync.retry(); + if(_outAsync.completed(ex)) + { + _outAsync.invokeCompletedAsync(); + } } } diff --git a/java/test/Ice/proxy/AllTests.java b/java/test/Ice/proxy/AllTests.java index 45dfbe901ed..98e3c124fbb 100644 --- a/java/test/Ice/proxy/AllTests.java +++ b/java/test/Ice/proxy/AllTests.java @@ -519,6 +519,7 @@ public class AllTests try { base.ice_invocationTimeout(-1); + base.ice_invocationTimeout(-2); } catch(IllegalArgumentException e) { @@ -527,7 +528,7 @@ public class AllTests try { - base.ice_invocationTimeout(-2); + base.ice_invocationTimeout(-3); test(false); } catch(IllegalArgumentException e) diff --git a/java/test/Ice/timeout/AllTests.java b/java/test/Ice/timeout/AllTests.java index 46dd8e00452..7f0239f32c5 100644 --- a/java/test/Ice/timeout/AllTests.java +++ b/java/test/Ice/timeout/AllTests.java @@ -272,6 +272,52 @@ public class AllTests to.begin_sleep(250 * mult, cb); cb.check(); } + { + // + // Backward compatible connection timeouts + // + TimeoutPrx to = TimeoutPrxHelper.uncheckedCast(obj.ice_invocationTimeout(-2).ice_timeout(250)); + Ice.Connection con = null; + try + { + con = to.ice_getConnection(); + to.sleep(500); + test(false); + } + catch(Ice.TimeoutException ex) + { + assert(con != null); + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException exc) + { + // Connection got closed as well. + } + } + + try + { + con = to.ice_getConnection(); + to.end_sleep(to.begin_sleep(500)); + test(false); + } + catch(Ice.TimeoutException ex) + { + assert(con != null); + try + { + con.getInfo(); + test(false); + } + catch(Ice.TimeoutException exc) + { + // Connection got closed as well. + } + } + } out.println("ok"); out.print("testing close timeout... "); |