summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/CollocatedRequestHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java421
1 files changed, 118 insertions, 303 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 4c79b2e3c58..1d97dc9a8d6 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -11,39 +11,10 @@ package IceInternal;
public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
- class InvokeAll extends DispatchWorkItem
+ private class InvokeAllAsync extends DispatchWorkItem
{
- public
- InvokeAll(OutgoingMessageCallback out, BasicStream os, int requestId, int invokeNum, boolean batch)
- {
- _out = out;
- _os = os;
- _requestId = requestId;
- _invokeNum = invokeNum;
- _batch = batch;
- }
-
- @Override
- public void
- run()
- {
- if(sent(_out))
- {
- invokeAll(_os, _requestId, _invokeNum, _batch);
- }
- }
-
- private final OutgoingMessageCallback _out;
- private final BasicStream _os;
- private final int _requestId;
- private final int _invokeNum;
- private final boolean _batch;
- };
-
- class InvokeAllAsync extends DispatchWorkItem
- {
- public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
- boolean batch)
+ private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
+ boolean batch)
{
_outAsync = outAsync;
_os = os;
@@ -53,8 +24,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- public void
- run()
+ public void run()
{
if(sentAsync(_outAsync))
{
@@ -63,18 +33,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private final OutgoingAsyncMessageCallback _outAsync;
- private final BasicStream _os;
+ private BasicStream _os;
private final int _requestId;
private final int _invokeNum;
private final boolean _batch;
};
- private void
- fillInValue(BasicStream os, int pos, int value)
- {
- os.rewriteInt(pos, value);
- }
-
public
CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter)
{
@@ -207,50 +171,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- public boolean
- sendRequest(OutgoingMessageCallback out)
- {
- out.invokeCollocated(this);
- return !_response && _reference.getInvocationTimeout() == 0;
- }
-
- @Override
public int
sendAsyncRequest(OutgoingAsyncMessageCallback outAsync)
{
- return outAsync.__invokeCollocated(this);
- }
-
- @Override
- synchronized public boolean
- requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
- {
- Integer requestId = _sendRequests.get(out);
- if(requestId != null)
- {
- if(requestId > 0)
- {
- _requests.remove(requestId);
- }
- out.finished(ex);
- _sendRequests.remove(out);
- return true;
- }
- else if(out instanceof Outgoing)
- {
- Outgoing o = (Outgoing)out;
- assert(o != null);
- for(java.util.Map.Entry<Integer, Outgoing> e : _requests.entrySet())
- {
- if(e.getValue() == o)
- {
- out.finished(ex);
- _requests.remove(e.getKey());
- return true; // We're done.
- }
- }
- }
- return false;
+ return outAsync.invokeCollocated(this);
}
@Override
@@ -265,7 +189,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
return true; // We're done
}
@@ -278,7 +202,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
return true; // We're done
}
}
@@ -286,175 +210,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return false;
}
- public void
- invokeRequest(Outgoing out)
- {
- int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
- {
- synchronized(this)
- {
- if(_response)
- {
- requestId = ++_requestId;
- _requests.put(requestId, out);
- }
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendRequests.put(out, requestId);
- }
- }
- }
-
- out.attachCollocatedObserver(_adapter, requestId);
-
- if(_reference.getInvocationTimeout() > 0)
- {
- // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
- _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false));
- }
- else if(_dispatcher)
- {
- _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false));
- }
- else // Optimization: directly call invokeAll if there's no dispatcher.
- {
- out.sent();
- invokeAll(out.os(), requestId, 1, false);
- }
- }
-
- public int
- invokeAsyncRequest(OutgoingAsync outAsync)
- {
- int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
- {
- synchronized(this)
- {
- if(_response)
- {
- requestId = ++_requestId;
- _asyncRequests.put(requestId, outAsync);
- }
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, requestId);
- }
- }
- }
-
- outAsync.__attachCollocatedObserver(_adapter, requestId);
-
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false));
-
- return AsyncStatus.Queued;
- }
-
- public void
- invokeBatchRequests(BatchOutgoing out)
- {
- int invokeNum;
- synchronized(this)
- {
- waitStreamInUse();
- invokeNum = _batchRequestNum;
-
- if(_batchRequestNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendRequests.put(out, 0);
- }
-
- assert(!_batchStream.isEmpty());
- _batchStream.swap(out.os());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- out.attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true));
- }
- else if(_dispatcher)
- {
- _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true));
- }
- else // Optimization: directly call invokeAll if there's no dispatcher.
- {
- out.sent();
- invokeAll(out.os(), 0, invokeNum, true);
- }
- }
- else
- {
- out.sent();
- }
- }
-
- public int
- invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
- {
- int invokeNum;
- synchronized(this)
- {
- waitStreamInUse();
-
- invokeNum = _batchRequestNum;
- if(_batchRequestNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, 0);
- }
-
- assert(!_batchStream.isEmpty());
- _batchStream.swap(outAsync.__getOs());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- outAsync.__attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true));
- return AsyncStatus.Queued;
- }
- else if(outAsync.__sent())
- {
- return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback;
- }
- else
- {
- return AsyncStatus.Sent;
- }
- }
-
@Override
public void
- sendResponse(int requestId, BasicStream os, byte status)
+ sendResponse(int requestId, final BasicStream os, byte status)
{
OutgoingAsync outAsync = null;
synchronized(this)
@@ -469,25 +227,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
TraceUtil.traceRecv(os, _logger, _traceLevels);
}
- Outgoing out = _requests.get(requestId);
- if(out != null)
- {
- out.finished(os);
- _requests.remove(requestId);
- }
- else
+ outAsync = _asyncRequests.get(requestId);
+ if(outAsync != null)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ _asyncRequests.remove(requestId);
}
}
if(outAsync != null)
{
- outAsync.__finished(os);
+ outAsync.finished(os);
}
_adapter.decDirectCount();
}
@@ -517,19 +266,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- Outgoing out = _requests.remove(requestId);
- if(out != null)
- {
- out.finished(ex);
- }
- else
- {
- outAsync = _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
if(outAsync != null)
{
- outAsync.__finished(ex);
+ outAsync.finished(ex);
}
}
_adapter.decDirectCount();
@@ -556,25 +297,104 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return null;
}
- boolean
- sent(OutgoingMessageCallback out)
+ void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
- if(_reference.getInvocationTimeout() > 0)
+ int requestId = 0;
+ if(_reference.getInvocationTimeout() > 0 || _response)
{
synchronized(this)
{
- if(_sendRequests.remove(out) == null)
+ if(_response)
{
- return false; // The request timed-out.
+ requestId = ++_requestId;
+ _asyncRequests.put(requestId, outAsync);
+ }
+ if(_reference.getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.put(outAsync, requestId);
}
}
}
- out.sent();
- return true;
+
+ outAsync.attachCollocatedObserver(_adapter, requestId);
+
+ if(synchronous)
+ {
+ //
+ // Treat this collocated call as if it is a synchronous invocation.
+ //
+ if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response)
+ {
+ // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ else if(_dispatcher)
+ {
+ _adapter.getThreadPool().dispatchFromThisThread(
+ new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
+ {
+ if(sentAsync(outAsync))
+ {
+ invokeAll(outAsync.getOs(), requestId, 1, false);
+ }
+ }
+ }
+ else
+ {
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ }
+
+ int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
+ {
+ int invokeNum;
+ synchronized(this)
+ {
+ waitStreamInUse();
+
+ invokeNum = _batchRequestNum;
+ if(_batchRequestNum > 0)
+ {
+ if(_reference.getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.put(outAsync, 0);
+ }
+
+ assert(!_batchStream.isEmpty());
+ _batchStream.swap(outAsync.getOs());
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
+ _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+ }
+ }
+
+ outAsync.attachCollocatedObserver(_adapter, 0);
+
+ if(invokeNum > 0)
+ {
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true));
+ return AsyncStatus.Queued;
+ }
+ else if(outAsync.sent())
+ {
+ return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback;
+ }
+ else
+ {
+ return AsyncStatus.Sent;
+ }
}
- boolean
- sentAsync(OutgoingAsyncMessageCallback outAsync)
+ private boolean
+ sentAsync(final OutgoingAsyncMessageCallback outAsync)
{
if(_reference.getInvocationTimeout() > 0)
{
@@ -586,14 +406,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
}
- if(outAsync.__sent())
+
+ if(outAsync.sent())
{
- outAsync.__invokeSent();
+ outAsync.invokeSent();
}
return true;
}
- void
+ private void
invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch)
{
if(batch)
@@ -668,7 +489,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- void
+ private void
handleException(int requestId, Ice.Exception ex)
{
if(requestId == 0)
@@ -679,25 +500,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- Outgoing out = _requests.get(requestId);
- if(out != null)
- {
- out.finished(ex);
- _requests.remove(requestId);
- }
- else
+ outAsync = _asyncRequests.get(requestId);
+ if(outAsync != null)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ _asyncRequests.remove(requestId);
}
}
if(outAsync != null)
{
- outAsync.__finished(ex);
+ outAsync.finished(ex);
}
}
@@ -731,6 +543,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
+ private void
+ fillInValue(BasicStream os, int pos, int value)
+ {
+ os.rewriteInt(pos, value);
+ }
+
private final Reference _reference;
private final boolean _dispatcher;
private final boolean _response;
@@ -741,12 +559,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private int _requestId;
- private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests =
- new java.util.HashMap<OutgoingMessageCallback, Integer>();
private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
- private java.util.Map<Integer, Outgoing> _requests = new java.util.HashMap<Integer, Outgoing>();
private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>();
private BasicStream _batchStream;