summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-09-27 16:31:46 -0700
committerMatthew Newhook <matthew@zeroc.com>2014-09-27 16:32:21 -0700
commit4951bbabdd6bd33a8e9ca0cdd46aad613a634626 (patch)
tree8634b14a258d2c9cee0e17a12af805e1af3fec76 /java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
parentFixed deadlock in connection binding code (ICE-5693) (diff)
downloadice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.bz2
ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.xz
ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.zip
- begin_ now never interrupts.
- All potentially blocking Ice APIs are interruption points. - Fixes to the incoming/outgoing factories and shutdown procedure - Fixed bug where connect() was from a user thread. - Added lots more tests to the interrupt test suite.
Diffstat (limited to 'java/src/IceInternal/CommunicatorBatchOutgoingAsync.java')
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java75
1 files changed, 72 insertions, 3 deletions
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
index d5c6189064a..ee05abf896d 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -9,6 +9,13 @@
package IceInternal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
{
public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
@@ -34,7 +41,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
_observer = ObserverHelper.get(instance, operation);
}
- public void flushConnection(Ice.ConnectionI con)
+ public void flushConnection(final Ice.ConnectionI con)
{
class BatchOutgoingAsyncI extends BatchOutgoingAsync
{
@@ -88,7 +95,12 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
}
}
}
- };
+
+ @Override
+ protected void cancelRequest()
+ {
+ }
+ }
synchronized(_monitor)
{
@@ -97,7 +109,59 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
try
{
- int status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ int status;
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ status = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ }
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = false;
@@ -153,5 +217,10 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
}
}
+ @Override
+ protected void cancelRequest()
+ {
+ }
+
private int _useCount;
}