summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
commit710a9221852d6c92b1727a429a33b38f1f949352 (patch)
tree6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /cpp/src/Ice/ConnectionI.cpp
parent- Fix for ICE-5578 - Python build failure (diff)
downloadice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2
ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz
ice-710a9221852d6c92b1727a429a33b38f1f949352.zip
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp231
1 files changed, 87 insertions, 144 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 850f245f163..ffeb9d80369 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -62,15 +62,16 @@ private:
Ice::ConnectionI* _connection;
};
-class DispatchDispatcherCall : public DispatcherCall
+class DispatchCall : public DispatchWorkItem
{
public:
- DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
- const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
- Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
- const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
- BasicStream& stream) :
+ DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
+ const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
+ Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
+ const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
+ BasicStream& stream) :
+ DispatchWorkItem(connection),
_connection(connection),
_startCB(startCB),
_sentCBs(sentCBs),
@@ -108,12 +109,11 @@ private:
BasicStream _stream;
};
-class FinishDispatcherCall : public DispatcherCall
+class FinishCall : public DispatchWorkItem
{
public:
- FinishDispatcherCall(const Ice::ConnectionIPtr& connection) :
- _connection(connection)
+ FinishCall(const Ice::ConnectionIPtr& connection) : DispatchWorkItem(connection), _connection(connection)
{
}
@@ -240,10 +240,10 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
adopted = true;
}
-bool
+void
Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
{
- assert((out || outAsync) && !isSent); // Only requests can timeout.
+ assert((out || outAsync)); // Only requests can timeout.
out = 0;
outAsync = 0;
if(adoptStream)
@@ -254,38 +254,31 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
{
assert(!adopted && !stream);
}
- return isSent;
}
bool
Ice::ConnectionI::OutgoingMessage::sent()
{
- isSent = true; // The message is sent.
-
if(adopted)
{
delete stream;
}
stream = 0;
-
+
if(out)
{
out->sent();
- return false;
}
else if(outAsync)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- invokeSentCallback = outAsync->__sent();
- return invokeSentCallback || receivedReply;
+ invokeSent = outAsync->__sent();
+ return invokeSent || receivedReply;
#else
return outAsync->__sent();
#endif
}
- else
- {
- return false;
- }
+ return false;
}
void
@@ -293,11 +286,11 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
{
if(out)
{
- out->finished(ex, isSent);
+ out->finished(ex);
}
else if(outAsync)
{
- outAsync->__finished(ex, isSent);
+ outAsync->__finished(ex);
}
if(adopted)
@@ -1217,19 +1210,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- bool isSent;
if(o == _sendStreams.begin())
{
- isSent = o->timedOut(true); // true = adopt the stream.
+ o->timedOut(true); // true = adopt the stream.
}
else
{
- isSent = o->timedOut(false);
+ o->timedOut(false);
_sendStreams.erase(o);
}
InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex, isSent);
+ out->finished(ex);
return;
}
}
@@ -1240,7 +1232,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
if(_requestsHint != _requests.end() && _requestsHint->second == o)
{
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex, true);
+ o->finished(ex);
_requests.erase(_requestsHint);
_requestsHint = _requests.end();
}
@@ -1251,7 +1243,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
if(p->second == o)
{
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex, true);
+ o->finished(ex);
assert(p != _requestsHint);
_requests.erase(p);
return; // We're done.
@@ -1264,82 +1256,68 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
void
Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
- bool isSent;
- bool finished = false;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ {
+ if(o->outAsync.get() == outAsync.get())
{
- if(o->outAsync.get() == outAsync.get())
+ if(o->requestId)
{
- if(o->requestId)
+ if(_asyncRequestsHint != _asyncRequests.end() &&
+ _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
{
- if(_asyncRequestsHint != _asyncRequests.end() &&
- _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
- {
- _asyncRequests.erase(_asyncRequestsHint);
- _asyncRequestsHint = _asyncRequests.end();
- }
- else
- {
- _asyncRequests.erase(o->requestId);
- }
- }
-
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
- {
- isSent = o->timedOut(true); // true = adopt the stream
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
}
else
{
- isSent = o->timedOut(false);
- _sendStreams.erase(o);
+ _asyncRequests.erase(o->requestId);
}
- finished = true;
- break; // We're done
}
- }
-
- if(!finished)
- {
- OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
- if(o)
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
{
- if(_asyncRequestsHint != _asyncRequests.end())
- {
- if(_asyncRequestsHint->second == o)
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->__finished(ex, true);
- _asyncRequests.erase(_asyncRequestsHint);
- _asyncRequestsHint = _asyncRequests.end();
- }
- }
-
- for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
- {
- if(p->second.get() == o.get())
- {
- assert(p != _asyncRequestsHint);
- _asyncRequests.erase(p);
- finished = true;
- isSent = true;
- break;
- }
- }
+ o->timedOut(true); // true = adopt the stream
+ }
+ else
+ {
+ o->timedOut(false);
+ _sendStreams.erase(o);
}
+ outAsync->__dispatchInvocationTimeout(_threadPool, this);
+ return; // We're done
}
}
- if(finished)
+ OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ if(o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- outAsync->__finished(ex, isSent);
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->second == o)
+ {
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ outAsync->__dispatchInvocationTimeout(_threadPool, this);
+ return; // We're done
+ }
+ }
+
+ for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
+ {
+ if(p->second.get() == o.get())
+ {
+ assert(p != _asyncRequestsHint);
+ _asyncRequests.erase(p);
+ outAsync->__dispatchInvocationTimeout(_threadPool, this);
+ return; // We're done
+ }
+ }
}
}
@@ -1814,35 +1792,16 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
io.completed();
}
- if(_dispatcher)
+ if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
{
- try
- {
- _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum,
- servantManager, adapter, outAsync, heartbeatCallback,
- current.stream), this);
- }
- catch(const std::exception& ex)
- {
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- Warning out(_instance->initializationData().logger);
- out << "dispatch exception:\n" << ex << '\n' << _desc;
- }
- }
- catch(...)
- {
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- Warning out(_instance->initializationData().logger);
- out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
- }
- }
+ dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback,
+ current.stream);
}
else
{
- dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback,
- current.stream);
+ _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum,
+ servantManager, adapter, outAsync, heartbeatCallback,
+ current.stream));
}
}
@@ -1872,7 +1831,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
for(vector<OutgoingMessage>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- if(p->invokeSentCallback)
+ if(p->invokeSent)
{
p->outAsync->__invokeSent();
}
@@ -1986,33 +1945,14 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
return;
}
- if(!_dispatcher)
+ current.ioCompleted();
+ if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher.
{
- current.ioCompleted();
finish();
}
else
{
- try
- {
- _dispatcher->dispatch(new FinishDispatcherCall(this), this);
- }
- catch(const std::exception& ex)
- {
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- Warning out(_instance->initializationData().logger);
- out << "dispatch exception:\n" << ex << '\n' << _desc;
- }
- }
- catch(...)
- {
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- Warning out(_instance->initializationData().logger);
- out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
- }
- }
+ _threadPool->dispatchFromThisThread(new FinishCall(this));
}
}
@@ -2042,13 +1982,16 @@ Ice::ConnectionI::finish()
// the response has been received in the meantime, we remove the message from
// _sendStreams to not call finished on a message which is already done.
//
- if(message->receivedReply)
+ if(message->isSent || message->receivedReply)
{
- if(message->sent() && message->invokeSentCallback)
+ if(message->sent() && message->invokeSent)
{
message->outAsync->__invokeSent();
}
- OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
+ if(message->receivedReply)
+ {
+ OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
+ }
_sendStreams.pop_front();
}
#endif
@@ -2074,13 +2017,13 @@ Ice::ConnectionI::finish()
for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- p->second->finished(*_exception.get(), true);
+ p->second->finished(*_exception.get());
}
_requests.clear();
for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get(), true);
+ q->second->__finished(*_exception.get());
}
_asyncRequests.clear();