diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
commit | b36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch) | |
tree | dfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/src/Ice/ConnectionI.java | |
parent | Add @Override where possible, and remove trailing white space. (diff) | |
download | ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2 ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip |
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user
thread. This makes Ice for Java interrupt safe. This is implemented
by the QueueRequestHanbler.
- EndpointHostResolver now uses an executor instead of a thread.
- Added java/demo/Ice/interrupt and java/test/Ice/interrupt.
- Made several changes that must be ported to C++ & C#.
- InvocationTimeout exceptions can hang forever.
- Connection establishment is always asynchronous.
- RequestHandler.requestTimeout and asyncRequestTimeout have been
renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 191 |
1 files changed, 99 insertions, 92 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 8d90a391253..3e8c735a99d 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -42,24 +42,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) { - if(callback != null) - { - _startCallback = callback; - return; - } + _startCallback = callback; + return; + } - // - // Wait for the connection to be validated. - // + // + // We start out in holding state. + // + setState(StateHolding); + } + } + catch(Ice.LocalException ex) + { + exception(ex); + callback.connectionStartFailed(this, _exception); + return; + } + + callback.connectionStartCompleted(this); + } + + public void + startAndWait() + throws InterruptedException + { + try + { + synchronized(this) + { + if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + { + assert(_exception != null); + throw (Ice.LocalException)_exception.fillInStackTrace(); + } + + if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) + { while(_state <= StateNotValidated) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } if(_state >= StateClosing) @@ -78,21 +99,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { exception(ex); - if(callback != null) - { - callback.connectionStartFailed(this, _exception); - return; - } - else - { - waitUntilFinished(); - throw ex; - } - } - - if(callback != null) - { - callback.connectionStartCompleted(this); + waitUntilFinished(); + return; } } @@ -171,6 +179,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } @@ -208,21 +217,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void waitUntilHolding() + throws InterruptedException { while(_state < StateHolding || _dispatchCount > 0) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } } public synchronized void waitUntilFinished() + throws InterruptedException { // // We wait indefinitely until the connection is finished and all @@ -232,13 +237,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // while(_state < StateFinished || _dispatchCount > 0) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } assert(_state == StateFinished); @@ -488,19 +487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException { - // - // Wait if flushing is currently in progress. - // - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + waitBatchStreamInUse(); if(_exception != null) { @@ -692,7 +679,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne flushBatchRequests() { IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name); - out.invoke(); + try + { + out.invoke(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } } private static final String __flushBatchRequests_name = "flushBatchRequests"; @@ -769,17 +763,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne synchronized public boolean flushBatchRequests(IceInternal.BatchOutgoing out) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - + waitBatchStreamInUse(); if(_exception != null) { throw (Ice.LocalException)_exception.fillInStackTrace(); @@ -832,16 +816,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + waitBatchStreamInUse(); if(_exception != null) { @@ -941,8 +916,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public void - requestTimedOut(IceInternal.OutgoingMessageCallback out) + synchronized public boolean + requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -964,8 +939,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { it.remove(); } - out.finished(new InvocationTimeoutException()); - return; // We're done. + out.finished(ex); + return true; // We're done. } } @@ -977,16 +952,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(it2.next() == o) { - o.finished(new InvocationTimeoutException()); + o.finished(ex); it2.remove(); - return; // We're done. + return true; // We're done. } } } + return false; } - public void - asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync) + public boolean + asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -1008,8 +984,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { it.remove(); } - outAsync.__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + return true; // We're done } } @@ -1022,13 +998,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(it2.next() == o) { it2.remove(); - outAsync.__dispatchInvocationTimeout(_threadPool, this); - return; // We're done. + outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + return true; // We're done. } } } - } + return false; + } @Override synchronized public void @@ -3092,6 +3069,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private void waitBatchStreamInUse() + { + // + // This is similar to a mutex lock in that the flag is + // only true for a short time period. As such we don't permit the + // wait to be interrupted. Instead the interrupted status is saved\ + // and restored. + // + boolean interrupted = false; + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + private static class OutgoingMessage { OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt) @@ -3263,4 +3269,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished }; + } |