diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/CommunicatorI.java | 3 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 7 | ||||
-rw-r--r-- | java/src/Ice/ObjectAdapterI.java | 20 | ||||
-rw-r--r-- | java/src/Ice/ObjectPrxHelperBase.java | 14 | ||||
-rw-r--r-- | java/src/IceInternal/BatchOutgoingAsync.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 11 | ||||
-rw-r--r-- | java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | 75 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectionBatchOutgoingAsync.java | 68 | ||||
-rw-r--r-- | java/src/IceInternal/GetConnectionOutgoingAsync.java | 9 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 3 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 22 | ||||
-rw-r--r-- | java/src/IceInternal/ObjectAdapterFactory.java | 5 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 11 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsyncBase.java | 75 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 112 | ||||
-rw-r--r-- | java/src/IceInternal/ProxyBatchOutgoingAsync.java | 23 | ||||
-rw-r--r-- | java/src/IceInternal/QueueRequestHandler.java | 284 |
17 files changed, 423 insertions, 321 deletions
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index ed517e4ae9f..76d46baab6a 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -203,8 +203,7 @@ public final class CommunicatorI implements Communicator public void flushBatchRequests() { - AsyncResult r = begin_flushBatchRequests(); - end_flushBatchRequests(r); + end_flushBatchRequests(begin_flushBatchRequests()); } @Override diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 38846212228..4f251b111f3 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -155,6 +155,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne @Override synchronized public void close(boolean force) { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + if(force) { setState(StateClosed, new ForcedCloseConnectionException()); @@ -645,6 +650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { result.invokeExceptionAsync(__ex); } + return result; } @@ -2567,6 +2573,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne case IceInternal.Protocol.replyMsg: { + IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 11eb4225e14..a9b21feab3e 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -143,6 +143,11 @@ public final class ObjectAdapterI implements ObjectAdapter public void waitForHold() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + while(true) { java.util.List<IceInternal.IncomingConnectionFactory> incomingConnectionFactories; @@ -218,6 +223,11 @@ public final class ObjectAdapterI implements ObjectAdapter public void deactivate() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + IceInternal.OutgoingConnectionFactory outgoingConnectionFactory; java.util.List<IceInternal.IncomingConnectionFactory> incomingConnectionFactories; IceInternal.LocatorInfo locatorInfo; @@ -310,6 +320,11 @@ public final class ObjectAdapterI implements ObjectAdapter public void waitForDeactivate() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + try { IceInternal.IncomingConnectionFactory[] incomingConnectionFactories; @@ -360,6 +375,11 @@ public final class ObjectAdapterI implements ObjectAdapter public void destroy() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + synchronized(this) { // diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 82d138470b0..031a2608466 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -11,9 +11,13 @@ package Ice; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import Ice.Instrumentation.InvocationObserver; import IceInternal.QueueRequestHandler; +import IceInternal.RequestHandler; import IceInternal.RetryException; /** @@ -2280,6 +2284,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { final InvocationObserver observer = IceInternal.ObserverHelper.get(this, "ice_getConnection"); int cnt = 0; + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } try { while(true) @@ -2779,7 +2787,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { synchronized(this) { - if(_requestHandler != handler) + if(_requestHandler == null) + { + _requestHandler = handler; + } + else if(_requestHandler != handler) { // // Update the request handler only if "previous" is the same diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index e78879afa6a..afafacaacc7 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -9,7 +9,7 @@ package IceInternal; -public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback +abstract public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback { BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) { diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 4e87380dfeb..8de4a93ebc4 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -310,7 +310,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) { int requestId = 0; - if(_reference.getInvocationTimeout() > 0 || _response) + if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response) { synchronized(this) { @@ -319,7 +319,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler requestId = ++_requestId; _asyncRequests.put(requestId, outAsync); } - if(_reference.getInvocationTimeout() > 0) + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, requestId); } @@ -367,7 +367,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler invokeNum = _batchRequestNum; if(_batchRequestNum > 0) { - if(_reference.getInvocationTimeout() > 0) + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, 0); } @@ -406,7 +406,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private boolean sentAsync(final OutgoingAsyncMessageCallback outAsync) { - if(_reference.getInvocationTimeout() > 0) + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { synchronized(this) { @@ -564,6 +564,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private int _requestId; + // A map of outstanding requests that can be canceled. A request + // can be canceled if it has an invocation timeout, or we support + // interrupts. private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests = new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>(); diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index d5c6189064a..ee05abf896d 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -9,6 +9,13 @@ package IceInternal; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase { public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, @@ -34,7 +41,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase _observer = ObserverHelper.get(instance, operation); } - public void flushConnection(Ice.ConnectionI con) + public void flushConnection(final Ice.ConnectionI con) { class BatchOutgoingAsyncI extends BatchOutgoingAsync { @@ -88,7 +95,12 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase } } } - }; + + @Override + protected void cancelRequest() + { + } + } synchronized(_monitor) { @@ -97,7 +109,59 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase try { - int status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + } if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = false; @@ -153,5 +217,10 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase } } + @Override + protected void cancelRequest() + { + } + private int _useCount; } diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java index 1b9fd2e0e3c..5a1f0a30886 100644 --- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java +++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java @@ -9,6 +9,13 @@ package IceInternal; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync { public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, @@ -20,7 +27,60 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync public void __invoke() { - int status = _connection.flushAsyncBatchRequests(this); + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = _connection.flushAsyncBatchRequests(this); + } + if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = true; @@ -36,6 +96,12 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync { return _connection; } + + @Override + protected void cancelRequest() + { + _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + } private Ice.ConnectionI _connection; } diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java index 75ef2d9b60f..33613a5c078 100644 --- a/java/src/IceInternal/GetConnectionOutgoingAsync.java +++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java @@ -125,6 +125,15 @@ public class GetConnectionOutgoingAsync extends OutgoingAsyncBase implements Out }); } + @Override + protected void cancelRequest() + { + if(_handler != null) + { + _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + } + } + private void handleException(Ice.Exception exc) { try diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 596731cc635..89dcf9b44ed 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -73,7 +73,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public void waitUntilFinished() - throws InterruptedException + throws InterruptedException { java.util.LinkedList<Ice.ConnectionI> connections = null; @@ -117,6 +117,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { c.close(true); } + throw e; } } } diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 4c5db5043d7..a53eb841186 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -409,6 +409,11 @@ public final class Instance public synchronized Ice.ObjectPrx createAdmin(Ice.ObjectAdapter adminAdapter, Ice.Identity adminIdentity) { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + boolean createAdapter = (adminAdapter == null); synchronized(this) @@ -478,6 +483,11 @@ public final class Instance public Ice.ObjectPrx getAdmin() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + Ice.ObjectAdapter adminAdapter; Ice.Identity adminIdentity; @@ -1137,6 +1147,11 @@ public final class Instance public void destroy() { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + synchronized(this) { // @@ -1181,12 +1196,7 @@ public final class Instance } catch (InterruptedException e) { - // - // Restore the interrupt, otherwise the instance will be - // left in an undefined state. The thread joins below will - // interrupt which is fine. - // - Thread.currentThread().interrupt(); + throw new Ice.OperationInterruptedException(); } } diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index 5b9688a70d7..63c3e592669 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -144,6 +144,11 @@ public final class ObjectAdapterFactory public synchronized Ice.ObjectAdapter createObjectAdapter(String name, Ice.RouterPrx router) { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + if(_instance == null) { throw new Ice.ObjectAdapterDeactivatedException(); diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 5762b1b60e2..1898cadb945 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -640,6 +640,15 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes super.invokeExceptionAsync(ex); } + + @Override + protected void cancelRequest() + { + if(_handler != null) + { + _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + } + } private void handleException(Ice.Exception exc) { @@ -686,4 +695,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); + + } diff --git a/java/src/IceInternal/OutgoingAsyncBase.java b/java/src/IceInternal/OutgoingAsyncBase.java index 8be0b3072bd..1497e45460e 100644 --- a/java/src/IceInternal/OutgoingAsyncBase.java +++ b/java/src/IceInternal/OutgoingAsyncBase.java @@ -21,7 +21,7 @@ import Ice.UserException; * With this object, an application can obtain several attributes of the * invocation and discover its outcome. **/ -public class OutgoingAsyncBase implements Ice.AsyncResult +public abstract class OutgoingAsyncBase implements Ice.AsyncResult { protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op, IceInternal.CallbackBase del) @@ -106,6 +106,10 @@ public class OutgoingAsyncBase implements Ice.AsyncResult { synchronized(_monitor) { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } while((_state & StateDone) == 0) { try @@ -148,6 +152,10 @@ public class OutgoingAsyncBase implements Ice.AsyncResult { synchronized(_monitor) { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } while((_state & StateSent) == 0 && _exception == null) { try @@ -235,36 +243,44 @@ public class OutgoingAsyncBase implements Ice.AsyncResult public final boolean __wait() { - synchronized(_monitor) + try { - if((_state & StateEndCalled) > 0) - { - throw new java.lang.IllegalArgumentException("end_ method called more than once"); - } - _state |= StateEndCalled; - while((_state & StateDone) == 0) + synchronized(_monitor) { - try + if((_state & StateEndCalled) > 0) + { + throw new java.lang.IllegalArgumentException("end_ method called more than once"); + } + + _state |= StateEndCalled; + if(Thread.interrupted()) + { + throw new InterruptedException(); + } + while((_state & StateDone) == 0) { _monitor.wait(); } - catch(InterruptedException ex) + + if(_exception != null) { - // - // Remove the EndCalled flag since it should be possible to - // call end_* again on the AsyncResult. - // - _state &= ~StateEndCalled; - throw new Ice.OperationInterruptedException(); + //throw (LocalException)_exception.fillInStackTrace(); + throw _exception; } + + return (_state & StateOK) > 0; } - if(_exception != null) - { - //throw (LocalException)_exception.fillInStackTrace(); - throw _exception; - } - return (_state & StateOK) > 0; } + catch(InterruptedException ex) + { + // This must be called outside of the monitor as the + // invocation will potentially want to lock the + // connection (which in turn may want to lock the outgoing + // to notify that the message has been sent). + cancelRequest(); + throw new Ice.OperationInterruptedException(); + } + } public final void throwUserException() @@ -405,14 +421,13 @@ public class OutgoingAsyncBase implements Ice.AsyncResult try { _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) + { + @Override + public void run() { - @Override - public void - run() - { - invokeSentInternal(); - } - }); + invokeSentInternal(); + } + }); } catch(CommunicatorDestroyedException exc) { @@ -548,6 +563,8 @@ public class OutgoingAsyncBase implements Ice.AsyncResult String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error); _instance.initializationData().logger.error(s); } + + abstract protected void cancelRequest(); protected Communicator _communicator; protected IceInternal.Instance _instance; diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index e2182170ffa..534888ec89e 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -86,88 +86,76 @@ public final class OutgoingConnectionFactory waitUntilFinished() throws InterruptedException { - try + java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; + synchronized(this) { - java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; - synchronized(this) + // + // First we wait until the factory is destroyed. We also + // wait until there are no pending connections + // anymore. Only then we can be sure the _connections + // contains all connections. + // + while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) { - // - // First we wait until the factory is destroyed. We also - // wait until there are no pending connections - // anymore. Only then we can be sure the _connections - // contains all connections. - // - while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) - { - wait(); - } - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); + wait(); } // - // Now we wait until the destruction of each connection is finished. + // We want to wait until all connections are finished outside the + // thread synchronization. // - for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) + connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); + } + + // + // Now we wait until the destruction of each connection is finished. + // + for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) + { + for(Ice.ConnectionI connection : connectionList) { - for(Ice.ConnectionI connection : connectionList) + try { - try - { - connection.waitUntilFinished(); - } - catch(InterruptedException e) + connection.waitUntilFinished(); + } + catch(InterruptedException e) + { + // + // Force close all of the connections. + // + for(java.util.List<Ice.ConnectionI> l : connections.values()) { - // - // Force close all of the connections. - // - for(java.util.List<Ice.ConnectionI> l : connections.values()) + for(Ice.ConnectionI c : l) { - for(Ice.ConnectionI c : l) - { - c.close(true); - } + c.close(true); } - throw e; } + throw e; } } + } - synchronized(this) + synchronized(this) + { + // Ensure all the connections are finished and reapable at this point. + java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); + if(cons != null) { - // Ensure all the connections are finished and reapable at this point. - java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); - if(cons != null) - { - int size = 0; - for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) - { - size += connectionList.size(); - } - assert(cons.size() == size); - _connections.clear(); - _connectionsByEndpoint.clear(); - } - else + int size = 0; + for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) { - assert(_connections.isEmpty()); - assert(_connectionsByEndpoint.isEmpty()); + size += connectionList.size(); } - _monitor.destroy(); + assert(cons.size() == size); + _connections.clear(); + _connectionsByEndpoint.clear(); + } + else + { + assert(_connections.isEmpty()); + assert(_connectionsByEndpoint.isEmpty()); } - } - catch(InterruptedException ex) - { - // Here wait() or waitUntilFinished() were interrupted. Clear the connections - // and such and continue along. - _connections.clear(); - _connectionsByEndpoint.clear(); _monitor.destroy(); - throw ex; } } diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index 009319110fe..f98bd51f997 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -22,11 +22,10 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync { Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol()); - RequestHandler handler = null; try { - handler = _proxy.__getRequestHandler(); - int status = handler.sendAsyncRequest(this); + _handler = _proxy.__getRequestHandler(); + int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = true; @@ -41,7 +40,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync { if((_state & StateDone) == 0) { - int invocationTimeout = handler.getReference().getInvocationTimeout(); + int invocationTimeout = _handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { _future = _instance.timer().schedule(new Runnable() @@ -52,7 +51,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync timeout(); } }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); - _timeoutRequestHandler = handler; + _timeoutRequestHandler = _handler; } } } @@ -65,7 +64,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync // isn't useful, there were no batch requests associated with // the proxy's request handler. // - _proxy.__setRequestHandler(handler, null); + _proxy.__setRequestHandler(_handler, null); } catch(Ice.Exception ex) { @@ -73,7 +72,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync { _observer.failed(ex.ice_name()); } - _proxy.__setRequestHandler(handler, null); // Clear request handler + _proxy.__setRequestHandler(_handler, null); // Clear request handler throw ex; // Throw to notify the user lthat batch requests were potentially lost. } } @@ -84,5 +83,15 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync return _proxy; } + @Override + protected void cancelRequest() + { + if(_handler != null) + { + _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + } + } + private Ice.ObjectPrxHelperBase _proxy; + private RequestHandler _handler = null; } diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index c4c315e7f5c..5f40ea37403 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -32,48 +32,15 @@ public class QueueRequestHandler implements RequestHandler public RequestHandler connect() { - try - { - Future<Void> future = _executor.submit(new Callable<Void>() - { - @Override - public Void call() throws RetryException - { - _delegate.connect(); - return null; - } - }); - - // - // Just wait for connect() to complete, don't return the - // request handler returned by connect() since it's not - // interrupt safe. - // - future.get(); - } - catch(RejectedExecutionException e) + performCallable(new Callable<Void>() { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) + @Override + public Void call() { - throw ex; - } - catch(Throwable ex) - { - assert(false); + _delegate.connect(); + return null; } - } + }); return this; } @@ -104,7 +71,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() + performCallable(new Callable<Void>() { @Override public Void call() throws RetryException @@ -113,120 +80,46 @@ public class QueueRequestHandler implements RequestHandler return null; } }); - - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); } - catch(InterruptedException e) + catch(RuntimeException ex) { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RetryException ex) - { - throw ex; - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) + if(ex.getCause() instanceof RetryException) { - assert(false); + throw (RetryException)ex.getCause(); } + throw ex; } + } @Override public void finishBatchRequest(final BasicStream out) { - try + performCallable(new Callable<Void>() { - Future<Void> future = _executor.submit(new Callable<Void>() - { - @Override - public Void call() - { - _delegate.finishBatchRequest(out); - return null; - } - }); - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) + @Override + public Void call() throws RetryException { - assert(false); + _delegate.finishBatchRequest(out); + return null; } - } + }); } @Override public void abortBatchRequest() { - try + performCallable(new Callable<Void>() { - Future<Void> future = _executor.submit(new Callable<Void>() + @Override + public Void call() { - @Override - public Void call() - { - _delegate.abortBatchRequest(); - return null; - } - }); - future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) - { - assert(false); + _delegate.abortBatchRequest(); + return null; } - } + }); } @Override @@ -235,7 +128,7 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Integer> future = _executor.submit(new Callable<Integer>() + return performCallable(new Callable<Integer>() { @Override public Integer call() throws RetryException @@ -243,90 +136,29 @@ public class QueueRequestHandler implements RequestHandler return _delegate.sendAsyncRequest(out); } }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - // If the request cannot be canceled (or is itself interrupted) then - // restore the interrupt state. - try - { - if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException())) - { - Thread.currentThread().interrupt(); - } - } - catch(Ice.OperationInterruptedException ex) - { - Thread.currentThread().interrupt(); - } } - catch(ExecutionException e) + catch(RuntimeException ex) { - try - { - throw e.getCause(); - } - catch(RetryException ex) - { - throw ex; - } - catch(RuntimeException ex) + if(ex.getCause() instanceof RetryException) { - throw ex; - } - catch(Throwable ex) - { - assert(false); + throw (RetryException)ex.getCause(); } + throw ex; } - return 0; } @Override public boolean asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex) { - try + return performCallable(new Callable<Boolean>() { - Future<Boolean> future = _executor.submit(new Callable<Boolean>() + @Override + public Boolean call() { - @Override - public Boolean call() - { - return _delegate.asyncRequestCanceled(outAsync, ex); - } - }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); + return _delegate.asyncRequestCanceled(outAsync, ex); } - catch(RuntimeException exc) - { - throw exc; - } - catch(Throwable exc) - { - assert(false); - } - } - return false; + }); } @Override @@ -345,11 +177,55 @@ public class QueueRequestHandler implements RequestHandler @Override public ConnectionI - waitForConnection() throws InterruptedException, RetryException + waitForConnection() + throws InterruptedException, RetryException { return _delegate.waitForConnection(); } - + + private <T> T performCallable(Callable<T> callable) { + try + { + Future<T> future = _executor.submit(callable); + boolean interrupted = false; + while(true) + { + try + { + T value = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + return value; + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + throw new RuntimeException(ex); + } + } + } + private final RequestHandler _delegate; private final ExecutorService _executor; } |