diff options
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 99 |
1 files changed, 55 insertions, 44 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 1b47feccbb9..4c79b2e3c58 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -97,17 +97,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler synchronized public void prepareBatchRequest(BasicStream os) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitStreamInUse(); if(_batchStream.isEmpty()) { try @@ -232,8 +222,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - synchronized public void - requestTimedOut(OutgoingMessageCallback out) + synchronized public boolean + requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) { Integer requestId = _sendRequests.get(out); if(requestId != null) @@ -242,8 +232,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _requests.remove(requestId); } - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); _sendRequests.remove(out); + return true; } else if(out instanceof Outgoing) { @@ -253,17 +244,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { if(e.getValue() == o) { - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); _requests.remove(e.getKey()); - return; // We're done. + return true; // We're done. } } } + return false; } @Override - synchronized public void - asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) + synchronized public boolean + asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { Integer requestId = _sendAsyncRequests.get(outAsync); if(requestId != null) @@ -273,8 +265,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } if(outAsync instanceof OutgoingAsync) @@ -286,11 +278,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } } } + return false; } public void @@ -364,17 +357,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler int invokeNum; synchronized(this) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitStreamInUse(); invokeNum = _batchRequestNum; if(_batchRequestNum > 0) @@ -428,16 +411,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler int invokeNum; synchronized(this) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } + waitStreamInUse(); invokeNum = _batchRequestNum; if(_batchRequestNum > 0) @@ -570,7 +544,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public Ice.ConnectionI - getConnection(boolean wait) + getConnection() + { + return null; + } + + @Override + public Ice.ConnectionI + waitForConnection() { return null; } @@ -720,6 +701,36 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } + private void + waitStreamInUse() + { + // + // This is similar to a mutex lock in that the stream is + // only "locked" while marshaling. As such we don't permit the wait + // to be interrupted. Instead the interrupted status is saved and + // restored. + // + boolean interrupted = false; + while(_batchStreamInUse) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + private final Reference _reference; private final boolean _dispatcher; private final boolean _response; |