diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-09-27 16:31:46 -0700 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-09-27 16:32:21 -0700 |
commit | 4951bbabdd6bd33a8e9ca0cdd46aad613a634626 (patch) | |
tree | 8634b14a258d2c9cee0e17a12af805e1af3fec76 /java/src/IceInternal/QueueRequestHandler.java | |
parent | Fixed deadlock in connection binding code (ICE-5693) (diff) | |
download | ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.bz2 ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.xz ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.zip |
- begin_ now never interrupts.
- All potentially blocking Ice APIs are interruption points.
- Fixes to the incoming/outgoing factories and shutdown procedure
- Fixed bug where connect() was from a user thread.
- Added lots more tests to the interrupt test suite.
Diffstat (limited to 'java/src/IceInternal/QueueRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/QueueRequestHandler.java | 284 |
1 files changed, 80 insertions, 204 deletions
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index c4c315e7f5c..5f40ea37403 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -32,48 +32,15 @@ public class QueueRequestHandler implements RequestHandler public RequestHandler connect() { - try - { - Future<Void> future = _executor.submit(new Callable<Void>() - { - @Override - public Void call() throws RetryException - { - _delegate.connect(); - return null; - } - }); - - // - // Just wait for connect() to complete, don't return the - // request handler returned by connect() since it's not - // interrupt safe. - // - future.get(); - } - catch(RejectedExecutionException e) + performCallable(new Callable<Void>() { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) + @Override + public Void call() { - throw ex; - } - catch(Throwable ex) - { - assert(false); + _delegate.connect(); + return null; } - } + }); return this; } @@ -104,7 +71,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + performCallable(new Callable<Void>() { @Override public Void call() throws RetryException @@ -113,120 +80,46 @@ public class QueueRequestHandler implements RequestHandler return null; } }); - - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); } - catch(InterruptedException e) + catch(RuntimeException ex) { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RetryException ex) - { - throw ex; - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) + if(ex.getCause() instanceof RetryException) { - assert(false); + throw (RetryException)ex.getCause(); } + throw ex; } + } @Override public void finishBatchRequest(final BasicStream out) { - try + performCallable(new Callable<Void>() { - Future<Void> future = _executor.submit(new Callable<Void>() - { - @Override - public Void call() - { - _delegate.finishBatchRequest(out); - return null; - } - }); - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) + @Override + public Void call() throws RetryException { - assert(false); + _delegate.finishBatchRequest(out); + return null; } - } + }); } @Override public void abortBatchRequest() { - try + performCallable(new Callable<Void>() { - Future<Void> future = _executor.submit(new Callable<Void>() + @Override + public Void call() { - @Override - public Void call() - { - _delegate.abortBatchRequest(); - return null; - } - }); - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) - { - assert(false); + _delegate.abortBatchRequest(); + return null; } - } + }); } @Override @@ -235,7 +128,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Integer> future = _executor.submit(new Callable<Integer>() + return performCallable(new Callable<Integer>() { @Override public Integer call() throws RetryException @@ -243,90 +136,29 @@ public class QueueRequestHandler implements RequestHandler return _delegate.sendAsyncRequest(out); } }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - // If the request cannot be canceled (or is itself interrupted) then - // restore the interrupt state. - try - { - if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException())) - { - Thread.currentThread().interrupt(); - } - } - catch(Ice.OperationInterruptedException ex) - { - Thread.currentThread().interrupt(); - } } - catch(ExecutionException e) + catch(RuntimeException ex) { - try - { - throw e.getCause(); - } - catch(RetryException ex) - { - throw ex; - } - catch(RuntimeException ex) + if(ex.getCause() instanceof RetryException) { - throw ex; - } - catch(Throwable ex) - { - assert(false); + throw (RetryException)ex.getCause(); } + throw ex; } - return 0; } @Override public boolean asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex) { - try + return performCallable(new Callable<Boolean>() { - Future<Boolean> future = _executor.submit(new Callable<Boolean>() + @Override + public Boolean call() { - @Override - public Boolean call() - { - return _delegate.asyncRequestCanceled(outAsync, ex); - } - }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); + return _delegate.asyncRequestCanceled(outAsync, ex); } - catch(RuntimeException exc) - { - throw exc; - } - catch(Throwable exc) - { - assert(false); - } - } - return false; + }); } @Override @@ -345,11 +177,55 @@ public class QueueRequestHandler implements RequestHandler @Override public ConnectionI - waitForConnection() throws InterruptedException, RetryException + waitForConnection() + throws InterruptedException, RetryException { return _delegate.waitForConnection(); } - + + private <T> T performCallable(Callable<T> callable) { + try + { + Future<T> future = _executor.submit(callable); + boolean interrupted = false; + while(true) + { + try + { + T value = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + return value; + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + throw new RuntimeException(ex); + } + } + } + private final RequestHandler _delegate; private final ExecutorService _executor; } |