summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-09-05 13:17:45 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-09-05 13:17:45 +0200
commit65d91832bd0f6bf55bfefd1244582cec2e5139dc (patch)
treee8aa359587a3605a5c6fa79f0842321554449c0b /cpp/src/Ice/ConnectionI.cpp
parentJS minor fix, remove unused variables (diff)
downloadice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.bz2
ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.tar.xz
ice-65d91832bd0f6bf55bfefd1244582cec2e5139dc.zip
Added back to optmization to not call connection dispatch if not necessary
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp109
1 files changed, 69 insertions, 40 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index b42b8558a7f..fcf05ef9db0 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
ConnectionCallbackPtr heartbeatCallback;
+ int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
@@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
swap(_startCallback, startCB);
if(startCB)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
servantManager,
adapter,
outAsync,
- heartbeatCallback));
+ heartbeatCallback,
+ dispatchCount));
}
if(readyOp & SocketOperationWrite)
@@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs));
if(!sentCBs.empty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
@@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
if(!readyOp)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity != IceUtil::Time())
+ {
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ io.completed();
}
catch(const DatagramLimitException&) // Expected.
{
@@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
return;
}
- if(_acmLastActivity != IceUtil::Time())
- {
- _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
- }
- io.completed();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
@@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1876,7 +1887,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
if(startCB)
{
startCB->connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
#else
p->outAsync->__invokeSent();
#endif
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->__finished();
- ++count;
+ outAsync->__invokeCompleted();
+ ++dispatchedCount;
}
if(heartbeatCallback)
@@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
Error out(_instance->initializationData().logger);
out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish()
}
if(message->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
_sendStreams.pop_front();
}
@@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
_observer.finishWrite(_writeStream);
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ SocketOperation op = _transceiver->closing(true, *_exception.get());
+ if(op)
+ {
+ return op;
+ }
+ }
}
catch(const Ice::LocalException& ex)
{
setState(StateClosed, ex);
- return SocketOperationNone;
- }
-
- //
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- SocketOperation op = _transceiver->closing(true, *_exception.get());
- if(op)
- {
- return op;
- }
}
-
return SocketOperationNone;
}
@@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback)
+ OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
invokeNum = 1;
servantManager = _servantManager;
adapter = _adapter;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}
@@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
servantManager = _servantManager;
adapter = _adapter;
- _dispatchCount += invokeNum;
+ dispatchCount += invokeNum;
}
break;
}
@@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
+ else if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
else
{
- ++_dispatchCount;
+ outAsync = 0;
}
#else
- ++_dispatchCount;
+ if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
+ else
+ {
+ outAsync = 0;
+ }
#endif
notifyAll(); // Notify threads blocked in close(false)
}
@@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(_callback)
{
heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}