summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
commitb36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch)
treedfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/src/Ice/ConnectionI.java
parentAdd @Override where possible, and remove trailing white space. (diff)
downloadice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user thread. This makes Ice for Java interrupt safe. This is implemented by the QueueRequestHanbler. - EndpointHostResolver now uses an executor instead of a thread. - Added java/demo/Ice/interrupt and java/test/Ice/interrupt. - Made several changes that must be ported to C++ & C#. - InvocationTimeout exceptions can hang forever. - Connection establishment is always asynchronous. - RequestHandler.requestTimeout and asyncRequestTimeout have been renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java191
1 files changed, 99 insertions, 92 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 8d90a391253..3e8c735a99d 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -42,24 +42,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
{
- if(callback != null)
- {
- _startCallback = callback;
- return;
- }
+ _startCallback = callback;
+ return;
+ }
- //
- // Wait for the connection to be validated.
- //
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ callback.connectionStartFailed(this, _exception);
+ return;
+ }
+
+ callback.connectionStartCompleted(this);
+ }
+
+ public void
+ startAndWait()
+ throws InterruptedException
+ {
+ try
+ {
+ synchronized(this)
+ {
+ if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
+ {
+ assert(_exception != null);
+ throw (Ice.LocalException)_exception.fillInStackTrace();
+ }
+
+ if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ {
while(_state <= StateNotValidated)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
if(_state >= StateClosing)
@@ -78,21 +99,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
catch(Ice.LocalException ex)
{
exception(ex);
- if(callback != null)
- {
- callback.connectionStartFailed(this, _exception);
- return;
- }
- else
- {
- waitUntilFinished();
- throw ex;
- }
- }
-
- if(callback != null)
- {
- callback.connectionStartCompleted(this);
+ waitUntilFinished();
+ return;
}
}
@@ -171,6 +179,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
@@ -208,21 +217,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public synchronized void
waitUntilHolding()
+ throws InterruptedException
{
while(_state < StateHolding || _dispatchCount > 0)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
}
public synchronized void
waitUntilFinished()
+ throws InterruptedException
{
//
// We wait indefinitely until the connection is finished and all
@@ -232,13 +237,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
while(_state < StateFinished || _dispatchCount > 0)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
assert(_state == StateFinished);
@@ -488,19 +487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
prepareBatchRequest(IceInternal.BasicStream os)
throws IceInternal.RetryException
{
- //
- // Wait if flushing is currently in progress.
- //
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
+ waitBatchStreamInUse();
if(_exception != null)
{
@@ -692,7 +679,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
flushBatchRequests()
{
IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name);
- out.invoke();
+ try
+ {
+ out.invoke();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
}
private static final String __flushBatchRequests_name = "flushBatchRequests";
@@ -769,17 +763,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
synchronized public boolean
flushBatchRequests(IceInternal.BatchOutgoing out)
{
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
+ waitBatchStreamInUse();
if(_exception != null)
{
throw (Ice.LocalException)_exception.fillInStackTrace();
@@ -832,16 +816,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
synchronized public int
flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
{
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
+ waitBatchStreamInUse();
if(_exception != null)
{
@@ -941,8 +916,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
}
- synchronized public void
- requestTimedOut(IceInternal.OutgoingMessageCallback out)
+ synchronized public boolean
+ requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex)
{
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
@@ -964,8 +939,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
it.remove();
}
- out.finished(new InvocationTimeoutException());
- return; // We're done.
+ out.finished(ex);
+ return true; // We're done.
}
}
@@ -977,16 +952,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(it2.next() == o)
{
- o.finished(new InvocationTimeoutException());
+ o.finished(ex);
it2.remove();
- return; // We're done.
+ return true; // We're done.
}
}
}
+ return false;
}
- public void
- asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync)
+ public boolean
+ asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
@@ -1008,8 +984,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
it.remove();
}
- outAsync.__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ return true; // We're done
}
}
@@ -1022,13 +998,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(it2.next() == o)
{
it2.remove();
- outAsync.__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done.
+ outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ return true; // We're done.
}
}
}
- }
+ return false;
+ }
@Override
synchronized public void
@@ -3092,6 +3069,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private void waitBatchStreamInUse()
+ {
+ //
+ // This is similar to a mutex lock in that the flag is
+ // only true for a short time period. As such we don't permit the
+ // wait to be interrupted. Instead the interrupted status is saved\
+ // and restored.
+ //
+ boolean interrupted = false;
+ while(_batchStreamInUse && _exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private static class OutgoingMessage
{
OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt)
@@ -3263,4 +3269,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
};
+
}