diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-02-27 17:38:59 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-02-27 17:38:59 +0100 |
commit | 5693628a2dec254458a007d07f7ef9519cf0e554 (patch) | |
tree | ef973d6c8e336024c323cbcf5d96cda217b93051 /cpp/src | |
parent | Fixed (ICE-5248) - opening VS2010 demo solution takes a long time (diff) | |
download | ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.bz2 ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.xz ice-5693628a2dec254458a007d07f7ef9519cf0e554.zip |
Fixed ICE-5275 - AMI race condition
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 61 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 31 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 38 |
3 files changed, 94 insertions, 36 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index ca0be9dc19d..c8ea16863f2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -66,7 +66,7 @@ class DispatchDispatcherCall : public DispatcherCall public: DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId, + const vector<ConnectionI::SentCallback>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) : _connection(connection), @@ -94,7 +94,7 @@ private: ConnectionIPtr _connection; ConnectionI::StartCallbackPtr _startCB; - vector<OutgoingAsyncMessageCallbackPtr> _sentCBs; + vector<ConnectionI::SentCallback> _sentCBs; Byte _compress; Int _requestId; Int _invokeNum; @@ -1292,7 +1292,7 @@ void Ice::ConnectionI::message(ThreadPoolCurrent& current) { StartCallbackPtr startCB; - vector<OutgoingAsyncMessageCallbackPtr> sentCBs; + vector<SentCallback> sentCBs; Byte compress = 0; Int requestId = 0; Int invokeNum = 0; @@ -1558,7 +1558,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } void -ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, +ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) { @@ -1574,9 +1574,20 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn // // Notify AMI calls that the message was sent. // - for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) + for(vector<SentCallback>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { - (*p)->__sent(); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + if(p->outAsync) + { + p->outAsync->__sent(); + } + if(p->replyOutAsync) + { + p->replyOutAsync->__finished(); + } +#else + p->outAsync->__sent(); +#endif } // @@ -1585,7 +1596,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn // if(outAsync) { - outAsync->__finished(stream); + outAsync->__finished(); } // @@ -2380,7 +2391,7 @@ Ice::ConnectionI::validate(SocketOperation operation) } void -Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callbacks) +Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks) { assert(!_sendStreams.empty()); assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end()); @@ -2393,11 +2404,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + bool sentCB = message->sent(this, true); + if(sentCB || message->replyOutAsync) + { + if(sentCB) + { + callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync)); + } + else + { + callbacks.push_back(SentCallback(0, message->replyOutAsync)); + } + } +#else if(message->sent(this, true)) { - assert(message->outAsync); - callbacks.push_back(message->outAsync); + callbacks.push_back(SentCallback(message->outAsync)); } +#endif _sendStreams.pop_front(); // @@ -2991,6 +3016,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { _asyncRequests.erase(q); } + + stream.swap(*outAsync->__getIs()); + +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + // + // If we just received the reply of a request which isn't acknowledge as + // sent yet, we queue the reply instead of processing it right away. It + // will be processed once the write callback is invoked for the message. + // + OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front(); + if(message && message->outAsync.get() == outAsync.get()) + { + swap(message->replyOutAsync, outAsync); + } +#endif + } notifyAll(); // Notify threads blocked in close(false) break; diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index bf60d039ef1..21d692d7536 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -94,6 +94,26 @@ public: }; typedef IceUtil::Handle<StartCallback> StartCallbackPtr; + struct SentCallback + { +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync, + const IceInternal::OutgoingAsyncPtr& replyOutAsync) : + outAsync(outAsync), replyOutAsync(replyOutAsync) + { + } +#else + SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync) : outAsync(outAsync) + { + } +#endif + + IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + IceInternal::OutgoingAsyncPtr replyOutAsync; +#endif + }; + enum DestructionReason { ObjectAdapterDeactivated, @@ -178,9 +198,9 @@ public: void exception(const LocalException&); void invokeException(const LocalException&, int); - void dispatch(const StartCallbackPtr&, const std::vector<IceInternal::OutgoingAsyncMessageCallbackPtr>&, - Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, - const IceInternal::OutgoingAsyncPtr&, IceInternal::BasicStream&); + void dispatch(const StartCallbackPtr&, const std::vector<SentCallback>&, Byte, Int, Int, + const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, + IceInternal::BasicStream&); void finish(); private: @@ -223,6 +243,9 @@ private: IceInternal::BasicStream* stream; IceInternal::OutgoingMessageCallback* out; IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + IceInternal::OutgoingAsyncPtr replyOutAsync; +#endif bool compress; int requestId; bool adopted; @@ -244,7 +267,7 @@ private: bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone); bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone); - void sendNextMessage(std::vector<IceInternal::OutgoingAsyncMessageCallbackPtr>&); + void sendNextMessage(std::vector<SentCallback>&); IceInternal::AsyncStatus sendMessage(OutgoingMessage&); #ifndef ICE_OS_WINRT diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index aec19bb526b..b558aef973a 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -507,30 +507,23 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) bool alreadySent = _state & Sent; // Expected in case of a retry. _state |= Sent; - // - // It's possible for the request to be done already when using IOCP. This - // is the case for example if the send callback is dispatched after the - // read callback for the response/exception. - // - if(!(_state & Done)) + assert(!(_state & Done)); + if(!_proxy->ice_isTwoway()) { - if(!_proxy->ice_isTwoway()) + _remoteObserver.detach(); + if(!_callback || !_callback->__hasSentCallback()) { - _remoteObserver.detach(); - if(!_callback || !_callback->__hasSentCallback()) - { - _observer.detach(); - } - _state |= Done | OK; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + _observer.detach(); } - else if(connection->timeout() > 0) - { - assert(!_timerTaskConnection); - _timerTaskConnection = connection; + _state |= Done | OK; + _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + } + else if(connection->timeout() > 0) + { + assert(!_timerTaskConnection); + _timerTaskConnection = connection; IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); _instance->timer()->schedule(this, timeout); - } } _monitor.notifyAll(); return !alreadySent && _callback && _callback->__hasSentCallback(); @@ -610,7 +603,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) } void -IceInternal::OutgoingAsync::__finished(BasicStream& is) +IceInternal::OutgoingAsync::__finished() { assert(_proxy->ice_isTwoway()); // Can only be called for twoways. @@ -619,9 +612,11 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get() && !(_state & Done)); + assert(!_is.b.empty()); + if(_remoteObserver) { - _remoteObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4)); + _remoteObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); } _remoteObserver.detach(); @@ -631,7 +626,6 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) _timerTaskConnection = 0; } - _is.swap(is); _is.read(replyStatus); switch(replyStatus) |