summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CollocatedRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java99
1 files changed, 55 insertions, 44 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 1b47feccbb9..4c79b2e3c58 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -97,17 +97,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized public void
prepareBatchRequest(BasicStream os)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitStreamInUse();
if(_batchStream.isEmpty())
{
try
@@ -232,8 +222,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- synchronized public void
- requestTimedOut(OutgoingMessageCallback out)
+ synchronized public boolean
+ requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
{
Integer requestId = _sendRequests.get(out);
if(requestId != null)
@@ -242,8 +232,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_requests.remove(requestId);
}
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
_sendRequests.remove(out);
+ return true;
}
else if(out instanceof Outgoing)
{
@@ -253,17 +244,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
if(e.getValue() == o)
{
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
_requests.remove(e.getKey());
- return; // We're done.
+ return true; // We're done.
}
}
}
+ return false;
}
@Override
- synchronized public void
- asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
+ synchronized public boolean
+ asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
Integer requestId = _sendAsyncRequests.get(outAsync);
if(requestId != null)
@@ -273,8 +265,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
if(outAsync instanceof OutgoingAsync)
@@ -286,11 +278,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
}
}
+ return false;
}
public void
@@ -364,17 +357,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeNum;
synchronized(this)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitStreamInUse();
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
@@ -428,16 +411,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeNum;
synchronized(this)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
+ waitStreamInUse();
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
@@ -570,7 +544,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public Ice.ConnectionI
- getConnection(boolean wait)
+ getConnection()
+ {
+ return null;
+ }
+
+ @Override
+ public Ice.ConnectionI
+ waitForConnection()
{
return null;
}
@@ -720,6 +701,36 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
+ private void
+ waitStreamInUse()
+ {
+ //
+ // This is similar to a mutex lock in that the stream is
+ // only "locked" while marshaling. As such we don't permit the wait
+ // to be interrupted. Instead the interrupted status is saved and
+ // restored.
+ //
+ boolean interrupted = false;
+ while(_batchStreamInUse)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private final Reference _reference;
private final boolean _dispatcher;
private final boolean _response;