summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/QueueRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/QueueRequestHandler.java')
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java142
1 files changed, 35 insertions, 107 deletions
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index 6c2a5b55f2c..f9d90e3e72e 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -21,12 +21,9 @@ import Ice.ConnectionI;
public class QueueRequestHandler implements RequestHandler
{
public
- QueueRequestHandler(Instance instance, RequestHandler delegate) {
+ QueueRequestHandler(Instance instance, RequestHandler delegate)
+ {
_executor = instance.getQueueExecutor();
- if(_executor == null)
- {
- throw new CommunicatorDestroyedException();
- }
assert(delegate != null);
_delegate = delegate;
}
@@ -37,7 +34,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call() throws RetryException
{
@@ -52,11 +50,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -83,7 +81,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call()
{
@@ -97,11 +96,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -124,7 +123,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call()
{
@@ -138,11 +138,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -160,74 +160,42 @@ public class QueueRequestHandler implements RequestHandler
}
@Override
- public boolean
- sendRequest(final OutgoingMessageCallback out) throws RetryException
+ public int
+ sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
{
try
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ Future<Integer> future = _executor.submit(new Callable<Integer>()
+ {
@Override
- public Boolean call() throws RetryException
+ public Integer call() throws RetryException
{
- return _delegate.sendRequest(out);
+ return _delegate.sendAsyncRequest(out);
}
});
return future.get();
}
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
catch(RejectedExecutionException e)
{
throw new CommunicatorDestroyedException();
}
- catch (ExecutionException e)
+ catch(InterruptedException e)
{
+ // If the request cannot be canceled (or is itself interrupted) then
+ // restore the interrupt state.
try
{
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
- {
- throw ex;
+ if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException()))
+ {
+ Thread.currentThread().interrupt();
+ }
}
- catch(Throwable ex)
+ catch(Ice.OperationInterruptedException ex)
{
- assert(false);
+ Thread.currentThread().interrupt();
}
}
- return false;
- }
-
- @Override
- public int
- sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
- {
- try
- {
- Future<Integer> future = _executor.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws RetryException
- {
- return _delegate.sendAsyncRequest(out);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -251,52 +219,12 @@ public class QueueRequestHandler implements RequestHandler
@Override
public boolean
- requestCanceled(final OutgoingMessageCallback out, final Ice.LocalException ex)
- {
- try
- {
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call()
- {
- return _delegate.requestCanceled(out, ex);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch (ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException exc)
- {
- throw exc;
- }
- catch(Throwable exc)
- {
- assert(false);
- }
- }
- return false;
- }
-
- @Override
- public boolean
asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
{
try
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ Future<Boolean> future = _executor.submit(new Callable<Boolean>()
+ {
@Override
public Boolean call()
{
@@ -309,11 +237,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -347,7 +275,7 @@ public class QueueRequestHandler implements RequestHandler
@Override
public ConnectionI
- waitForConnection() throws InterruptedException
+ waitForConnection() throws InterruptedException, RetryException
{
return _delegate.waitForConnection();
}