diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-02-27 17:38:59 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-02-27 17:38:59 +0100 |
commit | 5693628a2dec254458a007d07f7ef9519cf0e554 (patch) | |
tree | ef973d6c8e336024c323cbcf5d96cda217b93051 /cpp/src/Ice/ConnectionI.cpp | |
parent | Fixed (ICE-5248) - opening VS2010 demo solution takes a long time (diff) | |
download | ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.bz2 ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.xz ice-5693628a2dec254458a007d07f7ef9519cf0e554.zip |
Fixed ICE-5275 - AMI race condition
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 61 |
1 files changed, 51 insertions, 10 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index ca0be9dc19d..c8ea16863f2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -66,7 +66,7 @@ class DispatchDispatcherCall : public DispatcherCall public: DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId, + const vector<ConnectionI::SentCallback>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) : _connection(connection), @@ -94,7 +94,7 @@ private: ConnectionIPtr _connection; ConnectionI::StartCallbackPtr _startCB; - vector<OutgoingAsyncMessageCallbackPtr> _sentCBs; + vector<ConnectionI::SentCallback> _sentCBs; Byte _compress; Int _requestId; Int _invokeNum; @@ -1292,7 +1292,7 @@ void Ice::ConnectionI::message(ThreadPoolCurrent& current) { StartCallbackPtr startCB; - vector<OutgoingAsyncMessageCallbackPtr> sentCBs; + vector<SentCallback> sentCBs; Byte compress = 0; Int requestId = 0; Int invokeNum = 0; @@ -1558,7 +1558,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } void -ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, +ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) { @@ -1574,9 +1574,20 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn // // Notify AMI calls that the message was sent. // - for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) + for(vector<SentCallback>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { - (*p)->__sent(); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + if(p->outAsync) + { + p->outAsync->__sent(); + } + if(p->replyOutAsync) + { + p->replyOutAsync->__finished(); + } +#else + p->outAsync->__sent(); +#endif } // @@ -1585,7 +1596,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn // if(outAsync) { - outAsync->__finished(stream); + outAsync->__finished(); } // @@ -2380,7 +2391,7 @@ Ice::ConnectionI::validate(SocketOperation operation) } void -Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callbacks) +Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks) { assert(!_sendStreams.empty()); assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end()); @@ -2393,11 +2404,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + bool sentCB = message->sent(this, true); + if(sentCB || message->replyOutAsync) + { + if(sentCB) + { + callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync)); + } + else + { + callbacks.push_back(SentCallback(0, message->replyOutAsync)); + } + } +#else if(message->sent(this, true)) { - assert(message->outAsync); - callbacks.push_back(message->outAsync); + callbacks.push_back(SentCallback(message->outAsync)); } +#endif _sendStreams.pop_front(); // @@ -2991,6 +3016,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { _asyncRequests.erase(q); } + + stream.swap(*outAsync->__getIs()); + +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + // + // If we just received the reply of a request which isn't acknowledge as + // sent yet, we queue the reply instead of processing it right away. It + // will be processed once the write callback is invoked for the message. + // + OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front(); + if(message && message->outAsync.get() == outAsync.get()) + { + swap(message->replyOutAsync, outAsync); + } +#endif + } notifyAll(); // Notify threads blocked in close(false) break; |