diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
commit | b36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch) | |
tree | dfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/src/IceInternal/ConnectRequestHandler.java | |
parent | Add @Override where possible, and remove trailing white space. (diff) | |
download | ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2 ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip |
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user
thread. This makes Ice for Java interrupt safe. This is implemented
by the QueueRequestHanbler.
- EndpointHostResolver now uses an executor instead of a thread.
- Added java/demo/Ice/interrupt and java/test/Ice/interrupt.
- Made several changes that must be ported to C++ & C#.
- InvocationTimeout exceptions can hang forever.
- Connection establishment is always asynchronous.
- RequestHandler.requestTimeout and asyncRequestTimeout have been
renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 149 |
1 files changed, 84 insertions, 65 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 276395ef3ef..9cc70b6a76d 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.ConnectionI; + public class ConnectRequestHandler implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback { @@ -62,17 +64,7 @@ public class ConnectRequestHandler { synchronized(this) { - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitBatchRequestInProgress(); try { if(!initialized()) @@ -189,14 +181,14 @@ public class ConnectRequestHandler } @Override - public void - requestTimedOut(OutgoingMessageCallback out) + public boolean + requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) { synchronized(this) { if(_exception != null) { - return; // The request has been notified of a failure already. + return false; // The request has been notified of a failure already. } if(!initialized()) @@ -207,26 +199,26 @@ public class ConnectRequestHandler Request request = it.next(); if(request.out == out) { - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); it.remove(); - return; + return true; } } assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection.requestTimedOut(out); + return _connection.requestCanceled(out, ex); } @Override - public void - asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) + public boolean + asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { synchronized(this) { if(_exception != null) { - return; // The request has been notified of a failure already. + return false; // The request has been notified of a failure already. } if(!initialized()) @@ -238,14 +230,14 @@ public class ConnectRequestHandler if(request.outAsync == outAsync) { it.remove(); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } } assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection.asyncRequestTimedOut(outAsync); + return _connection.asyncRequestCanceled(outAsync, ex); } @Override @@ -256,37 +248,33 @@ public class ConnectRequestHandler } @Override - synchronized public Ice.ConnectionI - getConnection(boolean waitInit) - { - if(waitInit) - { - // - // Wait for the connection establishment to complete or fail. - // - while(!_initialized && _exception == null) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - } - + synchronized public ConnectionI + getConnection() { if(_exception != null) { throw (Ice.LocalException)_exception.fillInStackTrace(); } else { - assert(!waitInit || _initialized); return _connection; } } + @Override + synchronized public + ConnectionI waitForConnection() + throws InterruptedException + { + // + // Wait for the connection establishment to complete or fail. + // + while(!_initialized && _exception == null) + { + wait(); + } + return getConnection(); + } + // // Implementation of Reference.GetConnectionCallback // @@ -338,14 +326,14 @@ public class ConnectRequestHandler if(!_requests.isEmpty()) { _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void - run() - { - flushRequestsWithException(); - }; - }); + { + @Override + public void + run() + { + flushRequestsWithException(); + }; + }); } notifyAll(); @@ -393,16 +381,29 @@ public class ConnectRequestHandler } else { + // + // This is similar to a mutex lock in that the flag is + // only true for a short period of time. + // + boolean interrupted = false; while(_flushing && _exception == null) { try { wait(); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { + interrupted = true; } } + // + // Restore the interrupted status. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } if(_exception != null) { @@ -421,17 +422,7 @@ public class ConnectRequestHandler synchronized(this) { assert(_connection != null && !_initialized); - - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } + waitBatchRequestInProgress(); // // We set the _flushing flag to true to prevent any additional queuing. Callers @@ -566,7 +557,35 @@ public class ConnectRequestHandler } } - void + private void + waitBatchRequestInProgress() + { + // + // This is similar to a mutex lock in that the stream is + // only "locked" while the request is in progress. + // + boolean interrupted = false; + while(_batchRequestInProgress) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + + private void flushRequestsWithException() { for(Request request : _requests) |