summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-02-27 17:38:59 +0100
committerBenoit Foucher <benoit@zeroc.com>2013-02-27 17:38:59 +0100
commit5693628a2dec254458a007d07f7ef9519cf0e554 (patch)
treeef973d6c8e336024c323cbcf5d96cda217b93051 /cpp/src/Ice/ConnectionI.cpp
parentFixed (ICE-5248) - opening VS2010 demo solution takes a long time (diff)
downloadice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.bz2
ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.xz
ice-5693628a2dec254458a007d07f7ef9519cf0e554.zip
Fixed ICE-5275 - AMI race condition
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp61
1 files changed, 51 insertions, 10 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index ca0be9dc19d..c8ea16863f2 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -66,7 +66,7 @@ class DispatchDispatcherCall : public DispatcherCall
public:
DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
- const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId,
+ const vector<ConnectionI::SentCallback>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
_connection(connection),
@@ -94,7 +94,7 @@ private:
ConnectionIPtr _connection;
ConnectionI::StartCallbackPtr _startCB;
- vector<OutgoingAsyncMessageCallbackPtr> _sentCBs;
+ vector<ConnectionI::SentCallback> _sentCBs;
Byte _compress;
Int _requestId;
Int _invokeNum;
@@ -1292,7 +1292,7 @@ void
Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
StartCallbackPtr startCB;
- vector<OutgoingAsyncMessageCallbackPtr> sentCBs;
+ vector<SentCallback> sentCBs;
Byte compress = 0;
Int requestId = 0;
Int invokeNum = 0;
@@ -1558,7 +1558,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
void
-ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs,
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
{
@@ -1574,9 +1574,20 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn
//
// Notify AMI calls that the message was sent.
//
- for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
+ for(vector<SentCallback>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
- (*p)->__sent();
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ if(p->outAsync)
+ {
+ p->outAsync->__sent();
+ }
+ if(p->replyOutAsync)
+ {
+ p->replyOutAsync->__finished();
+ }
+#else
+ p->outAsync->__sent();
+#endif
}
//
@@ -1585,7 +1596,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn
//
if(outAsync)
{
- outAsync->__finished(stream);
+ outAsync->__finished();
}
//
@@ -2380,7 +2391,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
}
void
-Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callbacks)
+Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
{
assert(!_sendStreams.empty());
assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end());
@@ -2393,11 +2404,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ bool sentCB = message->sent(this, true);
+ if(sentCB || message->replyOutAsync)
+ {
+ if(sentCB)
+ {
+ callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync));
+ }
+ else
+ {
+ callbacks.push_back(SentCallback(0, message->replyOutAsync));
+ }
+ }
+#else
if(message->sent(this, true))
{
- assert(message->outAsync);
- callbacks.push_back(message->outAsync);
+ callbacks.push_back(SentCallback(message->outAsync));
}
+#endif
_sendStreams.pop_front();
//
@@ -2991,6 +3016,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
_asyncRequests.erase(q);
}
+
+ stream.swap(*outAsync->__getIs());
+
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ //
+ // If we just received the reply of a request which isn't acknowledge as
+ // sent yet, we queue the reply instead of processing it right away. It
+ // will be processed once the write callback is invoked for the message.
+ //
+ OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front();
+ if(message && message->outAsync.get() == outAsync.get())
+ {
+ swap(message->replyOutAsync, outAsync);
+ }
+#endif
+
}
notifyAll(); // Notify threads blocked in close(false)
break;