diff options
Diffstat (limited to 'java/src/IceInternal/CollocatedRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 421 |
1 files changed, 118 insertions, 303 deletions
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 4c79b2e3c58..1d97dc9a8d6 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -11,39 +11,10 @@ package IceInternal; public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { - class InvokeAll extends DispatchWorkItem + private class InvokeAllAsync extends DispatchWorkItem { - public - InvokeAll(OutgoingMessageCallback out, BasicStream os, int requestId, int invokeNum, boolean batch) - { - _out = out; - _os = os; - _requestId = requestId; - _invokeNum = invokeNum; - _batch = batch; - } - - @Override - public void - run() - { - if(sent(_out)) - { - invokeAll(_os, _requestId, _invokeNum, _batch); - } - } - - private final OutgoingMessageCallback _out; - private final BasicStream _os; - private final int _requestId; - private final int _invokeNum; - private final boolean _batch; - }; - - class InvokeAllAsync extends DispatchWorkItem - { - public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, - boolean batch) + private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, + boolean batch) { _outAsync = outAsync; _os = os; @@ -53,8 +24,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - public void - run() + public void run() { if(sentAsync(_outAsync)) { @@ -63,18 +33,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private final OutgoingAsyncMessageCallback _outAsync; - private final BasicStream _os; + private BasicStream _os; private final int _requestId; private final int _invokeNum; private final boolean _batch; }; - private void - fillInValue(BasicStream os, int pos, int value) - { - os.rewriteInt(pos, value); - } - public CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter) { @@ -207,50 +171,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - public boolean - sendRequest(OutgoingMessageCallback out) - { - out.invokeCollocated(this); - return !_response && _reference.getInvocationTimeout() == 0; - } - - @Override public int sendAsyncRequest(OutgoingAsyncMessageCallback outAsync) { - return outAsync.__invokeCollocated(this); - } - - @Override - synchronized public boolean - requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) - { - Integer requestId = _sendRequests.get(out); - if(requestId != null) - { - if(requestId > 0) - { - _requests.remove(requestId); - } - out.finished(ex); - _sendRequests.remove(out); - return true; - } - else if(out instanceof Outgoing) - { - Outgoing o = (Outgoing)out; - assert(o != null); - for(java.util.Map.Entry<Integer, Outgoing> e : _requests.entrySet()) - { - if(e.getValue() == o) - { - out.finished(ex); - _requests.remove(e.getKey()); - return true; // We're done. - } - } - } - return false; + return outAsync.invokeCollocated(this); } @Override @@ -265,7 +189,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); return true; // We're done } @@ -278,7 +202,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); return true; // We're done } } @@ -286,175 +210,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return false; } - public void - invokeRequest(Outgoing out) - { - int requestId = 0; - if(_reference.getInvocationTimeout() > 0 || _response) - { - synchronized(this) - { - if(_response) - { - requestId = ++_requestId; - _requests.put(requestId, out); - } - if(_reference.getInvocationTimeout() > 0) - { - _sendRequests.put(out, requestId); - } - } - } - - out.attachCollocatedObserver(_adapter, requestId); - - if(_reference.getInvocationTimeout() > 0) - { - // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false)); - } - else if(_dispatcher) - { - _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out.sent(); - invokeAll(out.os(), requestId, 1, false); - } - } - - public int - invokeAsyncRequest(OutgoingAsync outAsync) - { - int requestId = 0; - if(_reference.getInvocationTimeout() > 0 || _response) - { - synchronized(this) - { - if(_response) - { - requestId = ++_requestId; - _asyncRequests.put(requestId, outAsync); - } - if(_reference.getInvocationTimeout() > 0) - { - _sendAsyncRequests.put(outAsync, requestId); - } - } - } - - outAsync.__attachCollocatedObserver(_adapter, requestId); - - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false)); - - return AsyncStatus.Queued; - } - - public void - invokeBatchRequests(BatchOutgoing out) - { - int invokeNum; - synchronized(this) - { - waitStreamInUse(); - invokeNum = _batchRequestNum; - - if(_batchRequestNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _sendRequests.put(out, 0); - } - - assert(!_batchStream.isEmpty()); - _batchStream.swap(out.os()); - - // - // Reset the batch stream. - // - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - out.attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true)); - } - else if(_dispatcher) - { - _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out.sent(); - invokeAll(out.os(), 0, invokeNum, true); - } - } - else - { - out.sent(); - } - } - - public int - invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) - { - int invokeNum; - synchronized(this) - { - waitStreamInUse(); - - invokeNum = _batchRequestNum; - if(_batchRequestNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _sendAsyncRequests.put(outAsync, 0); - } - - assert(!_batchStream.isEmpty()); - _batchStream.swap(outAsync.__getOs()); - - // - // Reset the batch stream. - // - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - outAsync.__attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true)); - return AsyncStatus.Queued; - } - else if(outAsync.__sent()) - { - return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback; - } - else - { - return AsyncStatus.Sent; - } - } - @Override public void - sendResponse(int requestId, BasicStream os, byte status) + sendResponse(int requestId, final BasicStream os, byte status) { OutgoingAsync outAsync = null; synchronized(this) @@ -469,25 +227,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler TraceUtil.traceRecv(os, _logger, _traceLevels); } - Outgoing out = _requests.get(requestId); - if(out != null) - { - out.finished(os); - _requests.remove(requestId); - } - else + outAsync = _asyncRequests.get(requestId); + if(outAsync != null) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + _asyncRequests.remove(requestId); } } if(outAsync != null) { - outAsync.__finished(os); + outAsync.finished(os); } _adapter.decDirectCount(); } @@ -517,19 +266,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - Outgoing out = _requests.remove(requestId); - if(out != null) - { - out.finished(ex); - } - else - { - outAsync = _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } _adapter.decDirectCount(); @@ -556,25 +297,104 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return null; } - boolean - sent(OutgoingMessageCallback out) + void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) { - if(_reference.getInvocationTimeout() > 0) + int requestId = 0; + if(_reference.getInvocationTimeout() > 0 || _response) { synchronized(this) { - if(_sendRequests.remove(out) == null) + if(_response) { - return false; // The request timed-out. + requestId = ++_requestId; + _asyncRequests.put(requestId, outAsync); + } + if(_reference.getInvocationTimeout() > 0) + { + _sendAsyncRequests.put(outAsync, requestId); } } } - out.sent(); - return true; + + outAsync.attachCollocatedObserver(_adapter, requestId); + + if(synchronous) + { + // + // Treat this collocated call as if it is a synchronous invocation. + // + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response) + { + // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + else if(_dispatcher) + { + _adapter.getThreadPool().dispatchFromThisThread( + new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. + { + if(sentAsync(outAsync)) + { + invokeAll(outAsync.getOs(), requestId, 1, false); + } + } + } + else + { + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + } + + int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) + { + int invokeNum; + synchronized(this) + { + waitStreamInUse(); + + invokeNum = _batchRequestNum; + if(_batchRequestNum > 0) + { + if(_reference.getInvocationTimeout() > 0) + { + _sendAsyncRequests.put(outAsync, 0); + } + + assert(!_batchStream.isEmpty()); + _batchStream.swap(outAsync.getOs()); + + // + // Reset the batch stream. + // + BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, + _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchMarker = 0; + } + } + + outAsync.attachCollocatedObserver(_adapter, 0); + + if(invokeNum > 0) + { + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true)); + return AsyncStatus.Queued; + } + else if(outAsync.sent()) + { + return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback; + } + else + { + return AsyncStatus.Sent; + } } - boolean - sentAsync(OutgoingAsyncMessageCallback outAsync) + private boolean + sentAsync(final OutgoingAsyncMessageCallback outAsync) { if(_reference.getInvocationTimeout() > 0) { @@ -586,14 +406,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } } - if(outAsync.__sent()) + + if(outAsync.sent()) { - outAsync.__invokeSent(); + outAsync.invokeSent(); } return true; } - void + private void invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch) { if(batch) @@ -668,7 +489,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - void + private void handleException(int requestId, Ice.Exception ex) { if(requestId == 0) @@ -679,25 +500,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - Outgoing out = _requests.get(requestId); - if(out != null) - { - out.finished(ex); - _requests.remove(requestId); - } - else + outAsync = _asyncRequests.get(requestId); + if(outAsync != null) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + _asyncRequests.remove(requestId); } } if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } @@ -731,6 +543,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } + private void + fillInValue(BasicStream os, int pos, int value) + { + os.rewriteInt(pos, value); + } + private final Reference _reference; private final boolean _dispatcher; private final boolean _response; @@ -741,12 +559,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private int _requestId; - private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests = - new java.util.HashMap<OutgoingMessageCallback, Integer>(); private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests = new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>(); - private java.util.Map<Integer, Outgoing> _requests = new java.util.HashMap<Integer, Outgoing>(); private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>(); private BasicStream _batchStream; |