summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-09-27 16:31:46 -0700
committerMatthew Newhook <matthew@zeroc.com>2014-09-27 16:32:21 -0700
commit4951bbabdd6bd33a8e9ca0cdd46aad613a634626 (patch)
tree8634b14a258d2c9cee0e17a12af805e1af3fec76 /java/src/IceInternal/ConnectionBatchOutgoingAsync.java
parentFixed deadlock in connection binding code (ICE-5693) (diff)
downloadice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.bz2
ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.xz
ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.zip
- begin_ now never interrupts.
- All potentially blocking Ice APIs are interruption points. - Fixes to the incoming/outgoing factories and shutdown procedure - Fixed bug where connect() was from a user thread. - Added lots more tests to the interrupt test suite.
Diffstat (limited to 'java/src/IceInternal/ConnectionBatchOutgoingAsync.java')
-rw-r--r--java/src/IceInternal/ConnectionBatchOutgoingAsync.java68
1 files changed, 67 insertions, 1 deletions
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
index 1b9fd2e0e3c..5a1f0a30886 100644
--- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
@@ -9,6 +9,13 @@
package IceInternal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
{
public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance,
@@ -20,7 +27,60 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
public void __invoke()
{
- int status = _connection.flushAsyncBatchRequests(this);
+ int status;
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this);
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ status = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ status = _connection.flushAsyncBatchRequests(this);
+ }
+
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = true;
@@ -36,6 +96,12 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
{
return _connection;
}
+
+ @Override
+ protected void cancelRequest()
+ {
+ _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ }
private Ice.ConnectionI _connection;
}