diff options
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 65 |
1 files changed, 35 insertions, 30 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 8de4a93ebc4..48d7bfa5b7d 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -13,8 +13,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { private class InvokeAllAsync extends DispatchWorkItem { - private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, - boolean batch) + private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch) { _outAsync = outAsync; _os = os; @@ -32,7 +31,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - private final OutgoingAsyncMessageCallback _outAsync; + private final OutgoingAsyncBase _outAsync; private BasicStream _os; private final int _requestId; private final int _invokeNum; @@ -186,14 +185,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public int - sendAsyncRequest(OutgoingAsyncMessageCallback outAsync) + sendAsyncRequest(OutgoingAsyncBase outAsync) { return outAsync.invokeCollocated(this); } @Override - synchronized public boolean - asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) + synchronized public void + asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex) { Integer requestId = _sendAsyncRequests.get(outAsync); if(requestId != null) @@ -203,8 +202,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } if(outAsync instanceof OutgoingAsync) @@ -216,12 +218,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } } } - return false; } @Override @@ -242,9 +246,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } outAsync = _asyncRequests.remove(requestId); + if(outAsync != null && !outAsync.completed(os)) + { + outAsync = null; + } } - if(outAsync != null && outAsync.finished(os)) + if(outAsync != null) { outAsync.invokeCompleted(); } @@ -271,18 +279,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler public void invokeException(int requestId, Ice.LocalException ex, int invokeNum) { - if(requestId > 0) - { - OutgoingAsync outAsync = null; - synchronized(this) - { - outAsync = _asyncRequests.remove(requestId); - } - if(outAsync != null) - { - outAsync.finished(ex); - } - } + handleException(requestId, ex); _adapter.decDirectCount(); } @@ -307,7 +304,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return null; } - void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) + int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) { int requestId = 0; if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response) @@ -323,6 +320,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _sendAsyncRequests.put(outAsync, requestId); } + outAsync.cancelable(this); } } @@ -355,9 +353,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); } + return AsyncStatus.Queued; } - int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) + int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync) { int invokeNum; synchronized(this) @@ -370,6 +369,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, 0); + outAsync.cancelable(this); } assert(!_batchStream.isEmpty()); @@ -404,7 +404,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private boolean - sentAsync(final OutgoingAsyncMessageCallback outAsync) + sentAsync(final OutgoingAsyncBase outAsync) { if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { @@ -511,10 +511,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler synchronized(this) { outAsync = _asyncRequests.remove(requestId); + if(outAsync != null && !outAsync.completed(ex)) + { + outAsync = null; + } } + if(outAsync != null) { - outAsync.finished(ex); + outAsync.invokeCompleted(); } } @@ -567,8 +572,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler // A map of outstanding requests that can be canceled. A request // can be canceled if it has an invocation timeout, or we support // interrupts. - private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests = - new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>(); + private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests = + new java.util.HashMap<OutgoingAsyncBase, Integer>(); private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>(); |