diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-09-27 16:31:46 -0700 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-09-27 16:32:21 -0700 |
commit | 4951bbabdd6bd33a8e9ca0cdd46aad613a634626 (patch) | |
tree | 8634b14a258d2c9cee0e17a12af805e1af3fec76 /java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | |
parent | Fixed deadlock in connection binding code (ICE-5693) (diff) | |
download | ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.bz2 ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.tar.xz ice-4951bbabdd6bd33a8e9ca0cdd46aad613a634626.zip |
- begin_ now never interrupts.
- All potentially blocking Ice APIs are interruption points.
- Fixes to the incoming/outgoing factories and shutdown procedure
- Fixed bug where connect() was from a user thread.
- Added lots more tests to the interrupt test suite.
Diffstat (limited to 'java/src/IceInternal/CommunicatorBatchOutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | 75 |
1 files changed, 72 insertions, 3 deletions
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index d5c6189064a..ee05abf896d 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -9,6 +9,13 @@ package IceInternal; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase { public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, @@ -34,7 +41,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase _observer = ObserverHelper.get(instance, operation); } - public void flushConnection(Ice.ConnectionI con) + public void flushConnection(final Ice.ConnectionI con) { class BatchOutgoingAsyncI extends BatchOutgoingAsync { @@ -88,7 +95,12 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase } } } - }; + + @Override + protected void cancelRequest() + { + } + } synchronized(_monitor) { @@ -97,7 +109,59 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase try { - int status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + } if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = false; @@ -153,5 +217,10 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase } } + @Override + protected void cancelRequest() + { + } + private int _useCount; } |