From 65d91832bd0f6bf55bfefd1244582cec2e5139dc Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 5 Sep 2014 13:17:45 +0200 Subject: Added back to optmization to not call connection dispatch if not necessary --- cpp/src/Ice/ConnectionI.cpp | 109 ++++++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 40 deletions(-) (limited to 'cpp/src/Ice/ConnectionI.cpp') diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index b42b8558a7f..fcf05ef9db0 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; ConnectionCallbackPtr heartbeatCallback; + int dispatchCount = 0; ThreadPoolMessage msg(current, *this); @@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) swap(_startCallback, startCB); if(startCB) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) servantManager, adapter, outAsync, - heartbeatCallback)); + heartbeatCallback, + dispatchCount)); } if(readyOp & SocketOperationWrite) @@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) newOp = static_cast(newOp | sendNextMessage(sentCBs)); if(!sentCBs.empty()) { - ++_dispatchCount; + ++dispatchCount; } } @@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) if(!readyOp) { + assert(dispatchCount == 0); return; } } + + if(_acmLastActivity != IceUtil::Time()) + { + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); + } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + io.completed(); } catch(const DatagramLimitException&) // Expected. { @@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } return; } - if(_acmLastActivity != IceUtil::Time()) - { - _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); - } - io.completed(); } if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. @@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vectorconnectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vectorreceivedReply) { - OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } #else p->outAsync->__invokeSent(); #endif } - ++count; + ++dispatchedCount; } // @@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector__finished(); - ++count; + outAsync->__invokeCompleted(); + ++dispatchedCount; } if(heartbeatCallback) @@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vectorinitializationData().logger); out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc; } - ++count; + ++dispatchedCount; } // @@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector 0) + if(dispatchedCount > 0) { IceUtil::Monitor::Lock sync(*this); - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish() } if(message->receivedReply) { - OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } _sendStreams.pop_front(); } @@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector& callbacks) _observer.finishWrite(_writeStream); } } + + // + // If all the messages were sent and we are in the closing state, we schedule + // the close timeout to wait for the peer to close the connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + SocketOperation op = _transceiver->closing(true, *_exception.get()); + if(op) + { + return op; + } + } } catch(const Ice::LocalException& ex) { setState(StateClosed, ex); - return SocketOperationNone; - } - - // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - SocketOperation op = _transceiver->closing(true, *_exception.get()); - if(op) - { - return op; - } } - return SocketOperationNone; } @@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback) + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request invokeNum = 1; servantManager = _servantManager; adapter = _adapter; - ++_dispatchCount; + ++dispatchCount; } break; } @@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } servantManager = _servantManager; adapter = _adapter; - _dispatchCount += invokeNum; + dispatchCount += invokeNum; } break; } @@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } + else if(outAsync->__finished()) + { + ++dispatchCount; + } else { - ++_dispatchCount; + outAsync = 0; } #else - ++_dispatchCount; + if(outAsync->__finished()) + { + ++dispatchCount; + } + else + { + outAsync = 0; + } #endif notifyAll(); // Notify threads blocked in close(false) } @@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(_callback) { heartbeatCallback = _callback; - ++_dispatchCount; + ++dispatchCount; } break; } -- cgit v1.2.3