summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/QueueRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/QueueRequestHandler.java')
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java284
1 files changed, 80 insertions, 204 deletions
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index c4c315e7f5c..5f40ea37403 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -32,48 +32,15 @@ public class QueueRequestHandler implements RequestHandler
public RequestHandler
connect()
{
- try
- {
- Future<Void> future = _executor.submit(new Callable<Void>()
- {
- @Override
- public Void call() throws RetryException
- {
- _delegate.connect();
- return null;
- }
- });
-
- //
- // Just wait for connect() to complete, don't return the
- // request handler returned by connect() since it's not
- // interrupt safe.
- //
- future.get();
- }
- catch(RejectedExecutionException e)
+ performCallable(new Callable<Void>()
{
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
+ @Override
+ public Void call()
{
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ _delegate.connect();
+ return null;
}
- }
+ });
return this;
}
@@ -104,7 +71,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ performCallable(new Callable<Void>()
{
@Override
public Void call() throws RetryException
@@ -113,120 +80,46 @@ public class QueueRequestHandler implements RequestHandler
return null;
}
});
-
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
}
- catch(InterruptedException e)
+ catch(RuntimeException ex)
{
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
+ if(ex.getCause() instanceof RetryException)
{
- assert(false);
+ throw (RetryException)ex.getCause();
}
+ throw ex;
}
+
}
@Override
public void
finishBatchRequest(final BasicStream out)
{
- try
+ performCallable(new Callable<Void>()
{
- Future<Void> future = _executor.submit(new Callable<Void>()
- {
- @Override
- public Void call()
- {
- _delegate.finishBatchRequest(out);
- return null;
- }
- });
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
+ @Override
+ public Void call() throws RetryException
{
- assert(false);
+ _delegate.finishBatchRequest(out);
+ return null;
}
- }
+ });
}
@Override
public void
abortBatchRequest()
{
- try
+ performCallable(new Callable<Void>()
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ @Override
+ public Void call()
{
- @Override
- public Void call()
- {
- _delegate.abortBatchRequest();
- return null;
- }
- });
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ _delegate.abortBatchRequest();
+ return null;
}
- }
+ });
}
@Override
@@ -235,7 +128,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Integer> future = _executor.submit(new Callable<Integer>()
+ return performCallable(new Callable<Integer>()
{
@Override
public Integer call() throws RetryException
@@ -243,90 +136,29 @@ public class QueueRequestHandler implements RequestHandler
return _delegate.sendAsyncRequest(out);
}
});
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- // If the request cannot be canceled (or is itself interrupted) then
- // restore the interrupt state.
- try
- {
- if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException()))
- {
- Thread.currentThread().interrupt();
- }
- }
- catch(Ice.OperationInterruptedException ex)
- {
- Thread.currentThread().interrupt();
- }
}
- catch(ExecutionException e)
+ catch(RuntimeException ex)
{
- try
- {
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
+ if(ex.getCause() instanceof RetryException)
{
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ throw (RetryException)ex.getCause();
}
+ throw ex;
}
- return 0;
}
@Override
public boolean
asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
{
- try
+ return performCallable(new Callable<Boolean>()
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>()
+ @Override
+ public Boolean call()
{
- @Override
- public Boolean call()
- {
- return _delegate.asyncRequestCanceled(outAsync, ex);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
+ return _delegate.asyncRequestCanceled(outAsync, ex);
}
- catch(RuntimeException exc)
- {
- throw exc;
- }
- catch(Throwable exc)
- {
- assert(false);
- }
- }
- return false;
+ });
}
@Override
@@ -345,11 +177,55 @@ public class QueueRequestHandler implements RequestHandler
@Override
public ConnectionI
- waitForConnection() throws InterruptedException, RetryException
+ waitForConnection()
+ throws InterruptedException, RetryException
{
return _delegate.waitForConnection();
}
-
+
+ private <T> T performCallable(Callable<T> callable) {
+ try
+ {
+ Future<T> future = _executor.submit(callable);
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ T value = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ return value;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
private final RequestHandler _delegate;
private final ExecutorService _executor;
}