summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ConnectRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java149
1 files changed, 84 insertions, 65 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 276395ef3ef..9cc70b6a76d 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -9,6 +9,8 @@
package IceInternal;
+import Ice.ConnectionI;
+
public class ConnectRequestHandler
implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback
{
@@ -62,17 +64,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- while(_batchRequestInProgress)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitBatchRequestInProgress();
try
{
if(!initialized())
@@ -189,14 +181,14 @@ public class ConnectRequestHandler
}
@Override
- public void
- requestTimedOut(OutgoingMessageCallback out)
+ public boolean
+ requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
{
synchronized(this)
{
if(_exception != null)
{
- return; // The request has been notified of a failure already.
+ return false; // The request has been notified of a failure already.
}
if(!initialized())
@@ -207,26 +199,26 @@ public class ConnectRequestHandler
Request request = it.next();
if(request.out == out)
{
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
it.remove();
- return;
+ return true;
}
}
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection.requestTimedOut(out);
+ return _connection.requestCanceled(out, ex);
}
@Override
- public void
- asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
+ public boolean
+ asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
synchronized(this)
{
if(_exception != null)
{
- return; // The request has been notified of a failure already.
+ return false; // The request has been notified of a failure already.
}
if(!initialized())
@@ -238,14 +230,14 @@ public class ConnectRequestHandler
if(request.outAsync == outAsync)
{
it.remove();
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
}
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection.asyncRequestTimedOut(outAsync);
+ return _connection.asyncRequestCanceled(outAsync, ex);
}
@Override
@@ -256,37 +248,33 @@ public class ConnectRequestHandler
}
@Override
- synchronized public Ice.ConnectionI
- getConnection(boolean waitInit)
- {
- if(waitInit)
- {
- //
- // Wait for the connection establishment to complete or fail.
- //
- while(!_initialized && _exception == null)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
- }
-
+ synchronized public ConnectionI
+ getConnection() {
if(_exception != null)
{
throw (Ice.LocalException)_exception.fillInStackTrace();
}
else
{
- assert(!waitInit || _initialized);
return _connection;
}
}
+ @Override
+ synchronized public
+ ConnectionI waitForConnection()
+ throws InterruptedException
+ {
+ //
+ // Wait for the connection establishment to complete or fail.
+ //
+ while(!_initialized && _exception == null)
+ {
+ wait();
+ }
+ return getConnection();
+ }
+
//
// Implementation of Reference.GetConnectionCallback
//
@@ -338,14 +326,14 @@ public class ConnectRequestHandler
if(!_requests.isEmpty())
{
_reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
- {
- @Override
- public void
- run()
- {
- flushRequestsWithException();
- };
- });
+ {
+ @Override
+ public void
+ run()
+ {
+ flushRequestsWithException();
+ };
+ });
}
notifyAll();
@@ -393,16 +381,29 @@ public class ConnectRequestHandler
}
else
{
+ //
+ // This is similar to a mutex lock in that the flag is
+ // only true for a short period of time.
+ //
+ boolean interrupted = false;
while(_flushing && _exception == null)
{
try
{
wait();
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
+ interrupted = true;
}
}
+ //
+ // Restore the interrupted status.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
if(_exception != null)
{
@@ -421,17 +422,7 @@ public class ConnectRequestHandler
synchronized(this)
{
assert(_connection != null && !_initialized);
-
- while(_batchRequestInProgress)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
+ waitBatchRequestInProgress();
//
// We set the _flushing flag to true to prevent any additional queuing. Callers
@@ -566,7 +557,35 @@ public class ConnectRequestHandler
}
}
- void
+ private void
+ waitBatchRequestInProgress()
+ {
+ //
+ // This is similar to a mutex lock in that the stream is
+ // only "locked" while the request is in progress.
+ //
+ boolean interrupted = false;
+ while(_batchRequestInProgress)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void
flushRequestsWithException()
{
for(Request request : _requests)