diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 231 |
1 files changed, 87 insertions, 144 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 850f245f163..ffeb9d80369 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -62,15 +62,16 @@ private: Ice::ConnectionI* _connection; }; -class DispatchDispatcherCall : public DispatcherCall +class DispatchCall : public DispatchWorkItem { public: - DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, - Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, - BasicStream& stream) : + DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, + const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, + Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, + const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + BasicStream& stream) : + DispatchWorkItem(connection), _connection(connection), _startCB(startCB), _sentCBs(sentCBs), @@ -108,12 +109,11 @@ private: BasicStream _stream; }; -class FinishDispatcherCall : public DispatcherCall +class FinishCall : public DispatchWorkItem { public: - FinishDispatcherCall(const Ice::ConnectionIPtr& connection) : - _connection(connection) + FinishCall(const Ice::ConnectionIPtr& connection) : DispatchWorkItem(connection), _connection(connection) { } @@ -240,10 +240,10 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) adopted = true; } -bool +void Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) { - assert((out || outAsync) && !isSent); // Only requests can timeout. + assert((out || outAsync)); // Only requests can timeout. out = 0; outAsync = 0; if(adoptStream) @@ -254,38 +254,31 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) { assert(!adopted && !stream); } - return isSent; } bool Ice::ConnectionI::OutgoingMessage::sent() { - isSent = true; // The message is sent. - if(adopted) { delete stream; } stream = 0; - + if(out) { out->sent(); - return false; } else if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - invokeSentCallback = outAsync->__sent(); - return invokeSentCallback || receivedReply; + invokeSent = outAsync->__sent(); + return invokeSent || receivedReply; #else return outAsync->__sent(); #endif } - else - { - return false; - } + return false; } void @@ -293,11 +286,11 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) { if(out) { - out->finished(ex, isSent); + out->finished(ex); } else if(outAsync) { - outAsync->__finished(ex, isSent); + outAsync->__finished(ex); } if(adopted) @@ -1217,19 +1210,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - bool isSent; if(o == _sendStreams.begin()) { - isSent = o->timedOut(true); // true = adopt the stream. + o->timedOut(true); // true = adopt the stream. } else { - isSent = o->timedOut(false); + o->timedOut(false); _sendStreams.erase(o); } InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex, isSent); + out->finished(ex); return; } } @@ -1240,7 +1232,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) if(_requestsHint != _requests.end() && _requestsHint->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex, true); + o->finished(ex); _requests.erase(_requestsHint); _requestsHint = _requests.end(); } @@ -1251,7 +1243,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) if(p->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex, true); + o->finished(ex); assert(p != _requestsHint); _requests.erase(p); return; // We're done. @@ -1264,82 +1256,68 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) void Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { - bool isSent; - bool finished = false; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + { + if(o->outAsync.get() == outAsync.get()) { - if(o->outAsync.get() == outAsync.get()) + if(o->requestId) { - if(o->requestId) + if(_asyncRequestsHint != _asyncRequests.end() && + _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) { - if(_asyncRequestsHint != _asyncRequests.end() && - _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) - { - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); - } - else - { - _asyncRequests.erase(o->requestId); - } - } - - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) - { - isSent = o->timedOut(true); // true = adopt the stream + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); } else { - isSent = o->timedOut(false); - _sendStreams.erase(o); + _asyncRequests.erase(o->requestId); } - finished = true; - break; // We're done } - } - - if(!finished) - { - OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); - if(o) + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) { - if(_asyncRequestsHint != _asyncRequests.end()) - { - if(_asyncRequestsHint->second == o) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->__finished(ex, true); - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); - } - } - - for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) - { - if(p->second.get() == o.get()) - { - assert(p != _asyncRequestsHint); - _asyncRequests.erase(p); - finished = true; - isSent = true; - break; - } - } + o->timedOut(true); // true = adopt the stream + } + else + { + o->timedOut(false); + _sendStreams.erase(o); } + outAsync->__dispatchInvocationTimeout(_threadPool, this); + return; // We're done } } - if(finished) + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); + if(o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - outAsync->__finished(ex, isSent); + if(_asyncRequestsHint != _asyncRequests.end()) + { + if(_asyncRequestsHint->second == o) + { + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + outAsync->__dispatchInvocationTimeout(_threadPool, this); + return; // We're done + } + } + + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second.get() == o.get()) + { + assert(p != _asyncRequestsHint); + _asyncRequests.erase(p); + outAsync->__dispatchInvocationTimeout(_threadPool, this); + return; // We're done + } + } } } @@ -1814,35 +1792,16 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) io.completed(); } - if(_dispatcher) + if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. { - try - { - _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, heartbeatCallback, - current.stream), this); - } - catch(const std::exception& ex) - { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) - { - Warning out(_instance->initializationData().logger); - out << "dispatch exception:\n" << ex << '\n' << _desc; - } - } - catch(...) - { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) - { - Warning out(_instance->initializationData().logger); - out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; - } - } + dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback, + current.stream); } else { - dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback, - current.stream); + _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum, + servantManager, adapter, outAsync, heartbeatCallback, + current.stream)); } } @@ -1872,7 +1831,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess for(vector<OutgoingMessage>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - if(p->invokeSentCallback) + if(p->invokeSent) { p->outAsync->__invokeSent(); } @@ -1986,33 +1945,14 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) return; } - if(!_dispatcher) + current.ioCompleted(); + if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher. { - current.ioCompleted(); finish(); } else { - try - { - _dispatcher->dispatch(new FinishDispatcherCall(this), this); - } - catch(const std::exception& ex) - { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) - { - Warning out(_instance->initializationData().logger); - out << "dispatch exception:\n" << ex << '\n' << _desc; - } - } - catch(...) - { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) - { - Warning out(_instance->initializationData().logger); - out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; - } - } + _threadPool->dispatchFromThisThread(new FinishCall(this)); } } @@ -2042,13 +1982,16 @@ Ice::ConnectionI::finish() // the response has been received in the meantime, we remove the message from // _sendStreams to not call finished on a message which is already done. // - if(message->receivedReply) + if(message->isSent || message->receivedReply) { - if(message->sent() && message->invokeSentCallback) + if(message->sent() && message->invokeSent) { message->outAsync->__invokeSent(); } - OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); + if(message->receivedReply) + { + OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); + } _sendStreams.pop_front(); } #endif @@ -2074,13 +2017,13 @@ Ice::ConnectionI::finish() for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get(), true); + p->second->finished(*_exception.get()); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get(), true); + q->second->__finished(*_exception.get()); } _asyncRequests.clear(); |