summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CollocatedRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java65
1 files changed, 35 insertions, 30 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 8de4a93ebc4..48d7bfa5b7d 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -13,8 +13,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
private class InvokeAllAsync extends DispatchWorkItem
{
- private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
- boolean batch)
+ private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch)
{
_outAsync = outAsync;
_os = os;
@@ -32,7 +31,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- private final OutgoingAsyncMessageCallback _outAsync;
+ private final OutgoingAsyncBase _outAsync;
private BasicStream _os;
private final int _requestId;
private final int _invokeNum;
@@ -186,14 +185,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public int
- sendAsyncRequest(OutgoingAsyncMessageCallback outAsync)
+ sendAsyncRequest(OutgoingAsyncBase outAsync)
{
return outAsync.invokeCollocated(this);
}
@Override
- synchronized public boolean
- asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
+ synchronized public void
+ asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
Integer requestId = _sendAsyncRequests.get(outAsync);
if(requestId != null)
@@ -203,8 +202,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
if(outAsync instanceof OutgoingAsync)
@@ -216,12 +218,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
}
}
- return false;
}
@Override
@@ -242,9 +246,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
outAsync = _asyncRequests.remove(requestId);
+ if(outAsync != null && !outAsync.completed(os))
+ {
+ outAsync = null;
+ }
}
- if(outAsync != null && outAsync.finished(os))
+ if(outAsync != null)
{
outAsync.invokeCompleted();
}
@@ -271,18 +279,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
public void
invokeException(int requestId, Ice.LocalException ex, int invokeNum)
{
- if(requestId > 0)
- {
- OutgoingAsync outAsync = null;
- synchronized(this)
- {
- outAsync = _asyncRequests.remove(requestId);
- }
- if(outAsync != null)
- {
- outAsync.finished(ex);
- }
- }
+ handleException(requestId, ex);
_adapter.decDirectCount();
}
@@ -307,7 +304,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return null;
}
- void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
+ int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
int requestId = 0;
if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response)
@@ -323,6 +320,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_sendAsyncRequests.put(outAsync, requestId);
}
+ outAsync.cancelable(this);
}
}
@@ -355,9 +353,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
}
+ return AsyncStatus.Queued;
}
- int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
+ int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync)
{
int invokeNum;
synchronized(this)
@@ -370,6 +369,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, 0);
+ outAsync.cancelable(this);
}
assert(!_batchStream.isEmpty());
@@ -404,7 +404,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private boolean
- sentAsync(final OutgoingAsyncMessageCallback outAsync)
+ sentAsync(final OutgoingAsyncBase outAsync)
{
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
@@ -511,10 +511,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized(this)
{
outAsync = _asyncRequests.remove(requestId);
+ if(outAsync != null && !outAsync.completed(ex))
+ {
+ outAsync = null;
+ }
}
+
if(outAsync != null)
{
- outAsync.finished(ex);
+ outAsync.invokeCompleted();
}
}
@@ -567,8 +572,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
// A map of outstanding requests that can be canceled. A request
// can be canceled if it has an invocation timeout, or we support
// interrupts.
- private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
- new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
+ private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests =
+ new java.util.HashMap<OutgoingAsyncBase, Integer>();
private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>();