summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/CommunicatorI.java3
-rw-r--r--java/src/Ice/ConnectionI.java7
-rw-r--r--java/src/Ice/ObjectAdapterI.java20
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java14
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java2
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java11
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java75
-rw-r--r--java/src/IceInternal/ConnectionBatchOutgoingAsync.java68
-rw-r--r--java/src/IceInternal/GetConnectionOutgoingAsync.java9
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java3
-rw-r--r--java/src/IceInternal/Instance.java22
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java5
-rw-r--r--java/src/IceInternal/OutgoingAsync.java11
-rw-r--r--java/src/IceInternal/OutgoingAsyncBase.java75
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java112
-rw-r--r--java/src/IceInternal/ProxyBatchOutgoingAsync.java23
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java284
17 files changed, 423 insertions, 321 deletions
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index ed517e4ae9f..76d46baab6a 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -203,8 +203,7 @@ public final class CommunicatorI implements Communicator
public void
flushBatchRequests()
{
- AsyncResult r = begin_flushBatchRequests();
- end_flushBatchRequests(r);
+ end_flushBatchRequests(begin_flushBatchRequests());
}
@Override
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 38846212228..4f251b111f3 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -155,6 +155,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
@Override
synchronized public void close(boolean force)
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
if(force)
{
setState(StateClosed, new ForcedCloseConnectionException());
@@ -645,6 +650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
result.invokeExceptionAsync(__ex);
}
+
return result;
}
@@ -2567,6 +2573,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
case IceInternal.Protocol.replyMsg:
{
+
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 11eb4225e14..a9b21feab3e 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -143,6 +143,11 @@ public final class ObjectAdapterI implements ObjectAdapter
public void
waitForHold()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
while(true)
{
java.util.List<IceInternal.IncomingConnectionFactory> incomingConnectionFactories;
@@ -218,6 +223,11 @@ public final class ObjectAdapterI implements ObjectAdapter
public void
deactivate()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
IceInternal.OutgoingConnectionFactory outgoingConnectionFactory;
java.util.List<IceInternal.IncomingConnectionFactory> incomingConnectionFactories;
IceInternal.LocatorInfo locatorInfo;
@@ -310,6 +320,11 @@ public final class ObjectAdapterI implements ObjectAdapter
public void
waitForDeactivate()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
try
{
IceInternal.IncomingConnectionFactory[] incomingConnectionFactories;
@@ -360,6 +375,11 @@ public final class ObjectAdapterI implements ObjectAdapter
public void
destroy()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
synchronized(this)
{
//
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 82d138470b0..031a2608466 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -11,9 +11,13 @@ package Ice;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import Ice.Instrumentation.InvocationObserver;
import IceInternal.QueueRequestHandler;
+import IceInternal.RequestHandler;
import IceInternal.RetryException;
/**
@@ -2280,6 +2284,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
final InvocationObserver observer = IceInternal.ObserverHelper.get(this, "ice_getConnection");
int cnt = 0;
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
try
{
while(true)
@@ -2779,7 +2787,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
synchronized(this)
{
- if(_requestHandler != handler)
+ if(_requestHandler == null)
+ {
+ _requestHandler = handler;
+ }
+ else if(_requestHandler != handler)
{
//
// Update the request handler only if "previous" is the same
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index e78879afa6a..afafacaacc7 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
+abstract public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
{
BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback)
{
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 4e87380dfeb..8de4a93ebc4 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -310,7 +310,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
+ if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response)
{
synchronized(this)
{
@@ -319,7 +319,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
requestId = ++_requestId;
_asyncRequests.put(requestId, outAsync);
}
- if(_reference.getInvocationTimeout() > 0)
+ if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, requestId);
}
@@ -367,7 +367,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
- if(_reference.getInvocationTimeout() > 0)
+ if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, 0);
}
@@ -406,7 +406,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private boolean
sentAsync(final OutgoingAsyncMessageCallback outAsync)
{
- if(_reference.getInvocationTimeout() > 0)
+ if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
synchronized(this)
{
@@ -564,6 +564,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private int _requestId;
+ // A map of outstanding requests that can be canceled. A request
+ // can be canceled if it has an invocation timeout, or we support
+ // interrupts.
private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
index d5c6189064a..ee05abf896d 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -9,6 +9,13 @@
package IceInternal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
{
public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
@@ -34,7 +41,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
_observer = ObserverHelper.get(instance, operation);
}
- public void flushConnection(Ice.ConnectionI con)
+ public void flushConnection(final Ice.ConnectionI con)
{
class BatchOutgoingAsyncI extends BatchOutgoingAsync
{
@@ -88,7 +95,12 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
}
}
}
- };
+
+ @Override
+ protected void cancelRequest()
+ {
+ }
+ }
synchronized(_monitor)
{
@@ -97,7 +109,59 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
try
{
- int status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ int status;
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ status = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ }
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = false;
@@ -153,5 +217,10 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
}
}
+ @Override
+ protected void cancelRequest()
+ {
+ }
+
private int _useCount;
}
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
index 1b9fd2e0e3c..5a1f0a30886 100644
--- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
@@ -9,6 +9,13 @@
package IceInternal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
{
public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance,
@@ -20,7 +27,60 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
public void __invoke()
{
- int status = _connection.flushAsyncBatchRequests(this);
+ int status;
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this);
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ status = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ status = _connection.flushAsyncBatchRequests(this);
+ }
+
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = true;
@@ -36,6 +96,12 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
{
return _connection;
}
+
+ @Override
+ protected void cancelRequest()
+ {
+ _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ }
private Ice.ConnectionI _connection;
}
diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java
index 75ef2d9b60f..33613a5c078 100644
--- a/java/src/IceInternal/GetConnectionOutgoingAsync.java
+++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java
@@ -125,6 +125,15 @@ public class GetConnectionOutgoingAsync extends OutgoingAsyncBase implements Out
});
}
+ @Override
+ protected void cancelRequest()
+ {
+ if(_handler != null)
+ {
+ _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ }
+ }
+
private void handleException(Ice.Exception exc)
{
try
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 596731cc635..89dcf9b44ed 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -73,7 +73,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public void
waitUntilFinished()
- throws InterruptedException
+ throws InterruptedException
{
java.util.LinkedList<Ice.ConnectionI> connections = null;
@@ -117,6 +117,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
c.close(true);
}
+ throw e;
}
}
}
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 4c5db5043d7..a53eb841186 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -409,6 +409,11 @@ public final class Instance
public synchronized Ice.ObjectPrx
createAdmin(Ice.ObjectAdapter adminAdapter, Ice.Identity adminIdentity)
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
boolean createAdapter = (adminAdapter == null);
synchronized(this)
@@ -478,6 +483,11 @@ public final class Instance
public Ice.ObjectPrx
getAdmin()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
Ice.ObjectAdapter adminAdapter;
Ice.Identity adminIdentity;
@@ -1137,6 +1147,11 @@ public final class Instance
public void
destroy()
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
synchronized(this)
{
//
@@ -1181,12 +1196,7 @@ public final class Instance
}
catch (InterruptedException e)
{
- //
- // Restore the interrupt, otherwise the instance will be
- // left in an undefined state. The thread joins below will
- // interrupt which is fine.
- //
- Thread.currentThread().interrupt();
+ throw new Ice.OperationInterruptedException();
}
}
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index 5b9688a70d7..63c3e592669 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -144,6 +144,11 @@ public final class ObjectAdapterFactory
public synchronized Ice.ObjectAdapter
createObjectAdapter(String name, Ice.RouterPrx router)
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
if(_instance == null)
{
throw new Ice.ObjectAdapterDeactivatedException();
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 5762b1b60e2..1898cadb945 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -640,6 +640,15 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
super.invokeExceptionAsync(ex);
}
+
+ @Override
+ protected void cancelRequest()
+ {
+ if(_handler != null)
+ {
+ _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ }
+ }
private void handleException(Ice.Exception exc)
{
@@ -686,4 +695,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
+
+
}
diff --git a/java/src/IceInternal/OutgoingAsyncBase.java b/java/src/IceInternal/OutgoingAsyncBase.java
index 8be0b3072bd..1497e45460e 100644
--- a/java/src/IceInternal/OutgoingAsyncBase.java
+++ b/java/src/IceInternal/OutgoingAsyncBase.java
@@ -21,7 +21,7 @@ import Ice.UserException;
* With this object, an application can obtain several attributes of the
* invocation and discover its outcome.
**/
-public class OutgoingAsyncBase implements Ice.AsyncResult
+public abstract class OutgoingAsyncBase implements Ice.AsyncResult
{
protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op,
IceInternal.CallbackBase del)
@@ -106,6 +106,10 @@ public class OutgoingAsyncBase implements Ice.AsyncResult
{
synchronized(_monitor)
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
while((_state & StateDone) == 0)
{
try
@@ -148,6 +152,10 @@ public class OutgoingAsyncBase implements Ice.AsyncResult
{
synchronized(_monitor)
{
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
while((_state & StateSent) == 0 && _exception == null)
{
try
@@ -235,36 +243,44 @@ public class OutgoingAsyncBase implements Ice.AsyncResult
public final boolean __wait()
{
- synchronized(_monitor)
+ try
{
- if((_state & StateEndCalled) > 0)
- {
- throw new java.lang.IllegalArgumentException("end_ method called more than once");
- }
- _state |= StateEndCalled;
- while((_state & StateDone) == 0)
+ synchronized(_monitor)
{
- try
+ if((_state & StateEndCalled) > 0)
+ {
+ throw new java.lang.IllegalArgumentException("end_ method called more than once");
+ }
+
+ _state |= StateEndCalled;
+ if(Thread.interrupted())
+ {
+ throw new InterruptedException();
+ }
+ while((_state & StateDone) == 0)
{
_monitor.wait();
}
- catch(InterruptedException ex)
+
+ if(_exception != null)
{
- //
- // Remove the EndCalled flag since it should be possible to
- // call end_* again on the AsyncResult.
- //
- _state &= ~StateEndCalled;
- throw new Ice.OperationInterruptedException();
+ //throw (LocalException)_exception.fillInStackTrace();
+ throw _exception;
}
+
+ return (_state & StateOK) > 0;
}
- if(_exception != null)
- {
- //throw (LocalException)_exception.fillInStackTrace();
- throw _exception;
- }
- return (_state & StateOK) > 0;
}
+ catch(InterruptedException ex)
+ {
+ // This must be called outside of the monitor as the
+ // invocation will potentially want to lock the
+ // connection (which in turn may want to lock the outgoing
+ // to notify that the message has been sent).
+ cancelRequest();
+ throw new Ice.OperationInterruptedException();
+ }
+
}
public final void throwUserException()
@@ -405,14 +421,13 @@ public class OutgoingAsyncBase implements Ice.AsyncResult
try
{
_instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- invokeSentInternal();
- }
- });
+ invokeSentInternal();
+ }
+ });
}
catch(CommunicatorDestroyedException exc)
{
@@ -548,6 +563,8 @@ public class OutgoingAsyncBase implements Ice.AsyncResult
String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
_instance.initializationData().logger.error(s);
}
+
+ abstract protected void cancelRequest();
protected Communicator _communicator;
protected IceInternal.Instance _instance;
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index e2182170ffa..534888ec89e 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -86,88 +86,76 @@ public final class OutgoingConnectionFactory
waitUntilFinished()
throws InterruptedException
{
- try
+ java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null;
+ synchronized(this)
{
- java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null;
- synchronized(this)
+ //
+ // First we wait until the factory is destroyed. We also
+ // wait until there are no pending connections
+ // anymore. Only then we can be sure the _connections
+ // contains all connections.
+ //
+ while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
{
- //
- // First we wait until the factory is destroyed. We also
- // wait until there are no pending connections
- // anymore. Only then we can be sure the _connections
- // contains all connections.
- //
- while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
- {
- wait();
- }
-
- //
- // We want to wait until all connections are finished outside the
- // thread synchronization.
- //
- connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections);
+ wait();
}
//
- // Now we wait until the destruction of each connection is finished.
+ // We want to wait until all connections are finished outside the
+ // thread synchronization.
//
- for(java.util.List<Ice.ConnectionI> connectionList : connections.values())
+ connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections);
+ }
+
+ //
+ // Now we wait until the destruction of each connection is finished.
+ //
+ for(java.util.List<Ice.ConnectionI> connectionList : connections.values())
+ {
+ for(Ice.ConnectionI connection : connectionList)
{
- for(Ice.ConnectionI connection : connectionList)
+ try
{
- try
- {
- connection.waitUntilFinished();
- }
- catch(InterruptedException e)
+ connection.waitUntilFinished();
+ }
+ catch(InterruptedException e)
+ {
+ //
+ // Force close all of the connections.
+ //
+ for(java.util.List<Ice.ConnectionI> l : connections.values())
{
- //
- // Force close all of the connections.
- //
- for(java.util.List<Ice.ConnectionI> l : connections.values())
+ for(Ice.ConnectionI c : l)
{
- for(Ice.ConnectionI c : l)
- {
- c.close(true);
- }
+ c.close(true);
}
- throw e;
}
+ throw e;
}
}
+ }
- synchronized(this)
+ synchronized(this)
+ {
+ // Ensure all the connections are finished and reapable at this point.
+ java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
+ if(cons != null)
{
- // Ensure all the connections are finished and reapable at this point.
- java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
- if(cons != null)
- {
- int size = 0;
- for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
- {
- size += connectionList.size();
- }
- assert(cons.size() == size);
- _connections.clear();
- _connectionsByEndpoint.clear();
- }
- else
+ int size = 0;
+ for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
{
- assert(_connections.isEmpty());
- assert(_connectionsByEndpoint.isEmpty());
+ size += connectionList.size();
}
- _monitor.destroy();
+ assert(cons.size() == size);
+ _connections.clear();
+ _connectionsByEndpoint.clear();
+ }
+ else
+ {
+ assert(_connections.isEmpty());
+ assert(_connectionsByEndpoint.isEmpty());
}
- }
- catch(InterruptedException ex)
- {
- // Here wait() or waitUntilFinished() were interrupted. Clear the connections
- // and such and continue along.
- _connections.clear();
- _connectionsByEndpoint.clear();
_monitor.destroy();
- throw ex;
}
}
diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
index 009319110fe..f98bd51f997 100644
--- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
@@ -22,11 +22,10 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
{
Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol());
- RequestHandler handler = null;
try
{
- handler = _proxy.__getRequestHandler();
- int status = handler.sendAsyncRequest(this);
+ _handler = _proxy.__getRequestHandler();
+ int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = true;
@@ -41,7 +40,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
{
if((_state & StateDone) == 0)
{
- int invocationTimeout = handler.getReference().getInvocationTimeout();
+ int invocationTimeout = _handler.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
_future = _instance.timer().schedule(new Runnable()
@@ -52,7 +51,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
timeout();
}
}, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- _timeoutRequestHandler = handler;
+ _timeoutRequestHandler = _handler;
}
}
}
@@ -65,7 +64,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
// isn't useful, there were no batch requests associated with
// the proxy's request handler.
//
- _proxy.__setRequestHandler(handler, null);
+ _proxy.__setRequestHandler(_handler, null);
}
catch(Ice.Exception ex)
{
@@ -73,7 +72,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
{
_observer.failed(ex.ice_name());
}
- _proxy.__setRequestHandler(handler, null); // Clear request handler
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler
throw ex; // Throw to notify the user lthat batch requests were potentially lost.
}
}
@@ -84,5 +83,15 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
return _proxy;
}
+ @Override
+ protected void cancelRequest()
+ {
+ if(_handler != null)
+ {
+ _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ }
+ }
+
private Ice.ObjectPrxHelperBase _proxy;
+ private RequestHandler _handler = null;
}
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index c4c315e7f5c..5f40ea37403 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -32,48 +32,15 @@ public class QueueRequestHandler implements RequestHandler
public RequestHandler
connect()
{
- try
- {
- Future<Void> future = _executor.submit(new Callable<Void>()
- {
- @Override
- public Void call() throws RetryException
- {
- _delegate.connect();
- return null;
- }
- });
-
- //
- // Just wait for connect() to complete, don't return the
- // request handler returned by connect() since it's not
- // interrupt safe.
- //
- future.get();
- }
- catch(RejectedExecutionException e)
+ performCallable(new Callable<Void>()
{
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
+ @Override
+ public Void call()
{
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ _delegate.connect();
+ return null;
}
- }
+ });
return this;
}
@@ -104,7 +71,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ performCallable(new Callable<Void>()
{
@Override
public Void call() throws RetryException
@@ -113,120 +80,46 @@ public class QueueRequestHandler implements RequestHandler
return null;
}
});
-
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
}
- catch(InterruptedException e)
+ catch(RuntimeException ex)
{
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
+ if(ex.getCause() instanceof RetryException)
{
- assert(false);
+ throw (RetryException)ex.getCause();
}
+ throw ex;
}
+
}
@Override
public void
finishBatchRequest(final BasicStream out)
{
- try
+ performCallable(new Callable<Void>()
{
- Future<Void> future = _executor.submit(new Callable<Void>()
- {
- @Override
- public Void call()
- {
- _delegate.finishBatchRequest(out);
- return null;
- }
- });
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
+ @Override
+ public Void call() throws RetryException
{
- assert(false);
+ _delegate.finishBatchRequest(out);
+ return null;
}
- }
+ });
}
@Override
public void
abortBatchRequest()
{
- try
+ performCallable(new Callable<Void>()
{
- Future<Void> future = _executor.submit(new Callable<Void>()
+ @Override
+ public Void call()
{
- @Override
- public Void call()
- {
- _delegate.abortBatchRequest();
- return null;
- }
- });
- future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ _delegate.abortBatchRequest();
+ return null;
}
- }
+ });
}
@Override
@@ -235,7 +128,7 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Integer> future = _executor.submit(new Callable<Integer>()
+ return performCallable(new Callable<Integer>()
{
@Override
public Integer call() throws RetryException
@@ -243,90 +136,29 @@ public class QueueRequestHandler implements RequestHandler
return _delegate.sendAsyncRequest(out);
}
});
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- // If the request cannot be canceled (or is itself interrupted) then
- // restore the interrupt state.
- try
- {
- if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException()))
- {
- Thread.currentThread().interrupt();
- }
- }
- catch(Ice.OperationInterruptedException ex)
- {
- Thread.currentThread().interrupt();
- }
}
- catch(ExecutionException e)
+ catch(RuntimeException ex)
{
- try
- {
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
+ if(ex.getCause() instanceof RetryException)
{
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
+ throw (RetryException)ex.getCause();
}
+ throw ex;
}
- return 0;
}
@Override
public boolean
asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
{
- try
+ return performCallable(new Callable<Boolean>()
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>()
+ @Override
+ public Boolean call()
{
- @Override
- public Boolean call()
- {
- return _delegate.asyncRequestCanceled(outAsync, ex);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
+ return _delegate.asyncRequestCanceled(outAsync, ex);
}
- catch(RuntimeException exc)
- {
- throw exc;
- }
- catch(Throwable exc)
- {
- assert(false);
- }
- }
- return false;
+ });
}
@Override
@@ -345,11 +177,55 @@ public class QueueRequestHandler implements RequestHandler
@Override
public ConnectionI
- waitForConnection() throws InterruptedException, RetryException
+ waitForConnection()
+ throws InterruptedException, RetryException
{
return _delegate.waitForConnection();
}
-
+
+ private <T> T performCallable(Callable<T> callable) {
+ try
+ {
+ Future<T> future = _executor.submit(callable);
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ T value = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ return value;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
private final RequestHandler _delegate;
private final ExecutorService _executor;
}