diff options
Diffstat (limited to 'java/src/IceInternal/ConnectionBatchOutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/ConnectionBatchOutgoingAsync.java | 68 |
1 files changed, 67 insertions, 1 deletions
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java index 1b9fd2e0e3c..5a1f0a30886 100644 --- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java +++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java @@ -9,6 +9,13 @@ package IceInternal; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync { public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, @@ -20,7 +27,60 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync public void __invoke() { - int status = _connection.flushAsyncBatchRequests(this); + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = _connection.flushAsyncBatchRequests(this); + } + if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = true; @@ -36,6 +96,12 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync { return _connection; } + + @Override + protected void cancelRequest() + { + _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + } private Ice.ConnectionI _connection; } |