diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-09-05 13:17:45 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-09-05 13:17:45 +0200 |
commit | 65d91832bd0f6bf55bfefd1244582cec2e5139dc (patch) | |
tree | e8aa359587a3605a5c6fa79f0842321554449c0b /java | |
parent | JS minor fix, remove unused variables (diff) | |
download | ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.bz2 ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.xz ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.zip |
Added back to optmization to not call connection dispatch if not necessary
Diffstat (limited to 'java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 93 | ||||
-rw-r--r-- | java/src/IceInternal/AsyncResultI.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 17 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 105 |
4 files changed, 125 insertions, 92 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 00645ce8dc9..5cc8188ef09 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -953,7 +953,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne StartCallback startCB = null; java.util.List<OutgoingMessage> sentCBs = null; MessageInfo info = null; - + int dispatchCount = 0; + synchronized(this) { if(_state >= StateClosed) @@ -1114,7 +1115,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _startCallback = null; if(startCB != null) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1131,6 +1132,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Optimization: use the thread's stream. info = new MessageInfo(current.stream); newOp |= parseMessage(info); + dispatchCount += info.messageDispatchCount; } if((readyOp & IceInternal.SocketOperation.Write) != 0) @@ -1139,7 +1141,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne newOp |= sendNextMessage(sentCBs); if(!sentCBs.isEmpty()) { - ++_dispatchCount; + ++dispatchCount; } else { @@ -1155,9 +1157,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(readyOp == 0) { + assert(dispatchCount == 0); return; } } + + if(_acmLastActivity > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + current.ioCompleted(); } catch(DatagramLimitException ex) // Expected. { @@ -1194,13 +1210,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } return; } - - if(_acmLastActivity > 0) - { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); - } - - current.ioCompleted(); } if(!_dispatcher) // Optimization, call dispatch() directly if there's no @@ -1239,7 +1248,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) { - int count = 0; + int dispatchedCount = 0; // // Notify the factory that the connection establishment and @@ -1248,7 +1257,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(startCB != null) { startCB.connectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1260,7 +1269,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { msg.outAsync.invokeSent(); } - ++count; + ++dispatchedCount; } if(info != null) @@ -1271,8 +1280,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.outAsync != null) { - info.outAsync.finished(info.stream); - ++count; + info.outAsync.invokeCompleted(); + ++dispatchedCount; } if(info.heartbeatCallback != null) @@ -1285,7 +1294,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } - ++count; + ++dispatchedCount; } // @@ -1298,7 +1307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); // - // Don't increase count, the dispatch count is + // Don't increase dispatchedCount, the dispatch count is // decreased when the incoming reply is sent. // } @@ -1307,11 +1316,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Decrease dispatch count. // - if(count > 0) + if(dispatchedCount > 0) { synchronized(this) { - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -2194,28 +2203,26 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishWrite(_writeStream.getBuffer()); } } + + // + // If all the messages were sent and we are in the closing state, we + // schedule the close timeout to wait for the peer to close the + // connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + return op; + } + } } catch(Ice.LocalException ex) { setState(StateClosed, ex); - return IceInternal.SocketOperation.None; } - - // - // If all the messages were sent and we are in the closing state, we - // schedule the close timeout to wait for the peer to close the - // connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - int op = _transceiver.closing(true, _exception); - if(op != 0) - { - return op; - } - } - return IceInternal.SocketOperation.None; } @@ -2362,6 +2369,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ObjectAdapter adapter; IceInternal.OutgoingAsync outAsync; ConnectionCallback heartbeatCallback; + int messageDispatchCount; } private int parseMessage(MessageInfo info) @@ -2453,7 +2461,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.invokeNum = 1; info.servantManager = _servantManager; info.adapter = _adapter; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } @@ -2477,7 +2485,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } info.servantManager = _servantManager; info.adapter = _adapter; - _dispatchCount += info.invokeNum; + info.messageDispatchCount += info.invokeNum; } break; } @@ -2486,10 +2494,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - info.outAsync = _asyncRequests.remove(info.requestId); - if(info.outAsync != null) + IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); + if(outAsync != null && outAsync.finished(info.stream)) { - ++_dispatchCount; + info.outAsync = outAsync; + ++info.messageDispatchCount; } notifyAll(); // Notify threads blocked in close(false) break; @@ -2501,7 +2510,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_callback != null) { info.heartbeatCallback = _callback; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java index 9b211f0bb72..e2ca0b7c466 100644 --- a/java/src/IceInternal/AsyncResultI.java +++ b/java/src/IceInternal/AsyncResultI.java @@ -456,7 +456,7 @@ public class AsyncResultI implements Ice.AsyncResult { } - protected final void invokeCompleted() + public final void invokeCompleted() { // // Note: no need to change the _state here, specializations are responsible for diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 1d97dc9a8d6..3c94e5d93a8 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -227,16 +227,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler TraceUtil.traceRecv(os, _logger, _traceLevels); } - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } - if(outAsync != null) + if(outAsync != null && outAsync.finished(os)) { - outAsync.finished(os); + outAsync.invokeCompleted(); } _adapter.decDirectCount(); } @@ -500,13 +496,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } - if(outAsync != null) { outAsync.finished(ex); diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 0618ffb93e9..84bfc477768 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC // try { - if(!handleException(exc)) - { - return; // Can't be retried immediately. - } - - invoke(false); // Retry the invocation + handleException(exc); } catch(Ice.Exception ex) { @@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC }); } - public final void finished(BasicStream is) + public final boolean finished(BasicStream is) { + // + // NOTE: this method is called from ConnectionI.parseMessage + // with the connection locked. Therefore, it must not invoke + // any user callbacks. + // + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; @@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _childObserver.detach(); _childObserver = null; } - + if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - + // _is can already be initialized if the invocation is retried if(_is == null) { @@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: @@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } - _state |= StateDone; - // Clear buffer now, instead of waiting for AsyncResult - // deallocation - // _os.resize(0, false); if(replyStatus == ReplyStatus.replyOK) { _state |= StateOK; } + _state |= StateDone; _monitor.notifyAll(); + + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; } } - catch(Ice.LocalException ex) + catch(Ice.Exception exc) { - finished(ex); - return; - } + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return false; + } + catch(Ice.LocalException ex) + { + synchronized(_monitor) + { + _state |= StateDone; + _exception = ex; + _monitor.notifyAll(); - assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - invokeCompleted(); + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; + } + } + } } public final boolean invoke(boolean synchronous) @@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } } - break; } catch(RetryException ex) { // Clear request handler and retry. _proxy.__setRequestHandler(_handler, null); + continue; } catch(Ice.Exception ex) { // This will throw if the invocation can't be retried. - if(!handleException(ex)) - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return _sentSynchronously; } @@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC super.invokeExceptionAsync(ex); } - private boolean handleException(Ice.Exception exc) + private void handleException(Ice.Exception exc) { try { @@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC { _observer.retried(); // Invocation is being retried. } - if(interval.value > 0) - { - _instance.retryQueue().add(this, interval.value); - return false; // Don't retry immediately, the retry queue will - // take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + _instance.retryQueue().add(this, interval.value); } catch(Ice.Exception ex) { |