summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-02-27 17:38:59 +0100
committerBenoit Foucher <benoit@zeroc.com>2013-02-27 17:38:59 +0100
commit5693628a2dec254458a007d07f7ef9519cf0e554 (patch)
treeef973d6c8e336024c323cbcf5d96cda217b93051 /cpp/src
parentFixed (ICE-5248) - opening VS2010 demo solution takes a long time (diff)
downloadice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.bz2
ice-5693628a2dec254458a007d07f7ef9519cf0e554.tar.xz
ice-5693628a2dec254458a007d07f7ef9519cf0e554.zip
Fixed ICE-5275 - AMI race condition
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp61
-rw-r--r--cpp/src/Ice/ConnectionI.h31
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp38
3 files changed, 94 insertions, 36 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index ca0be9dc19d..c8ea16863f2 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -66,7 +66,7 @@ class DispatchDispatcherCall : public DispatcherCall
public:
DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
- const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId,
+ const vector<ConnectionI::SentCallback>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
_connection(connection),
@@ -94,7 +94,7 @@ private:
ConnectionIPtr _connection;
ConnectionI::StartCallbackPtr _startCB;
- vector<OutgoingAsyncMessageCallbackPtr> _sentCBs;
+ vector<ConnectionI::SentCallback> _sentCBs;
Byte _compress;
Int _requestId;
Int _invokeNum;
@@ -1292,7 +1292,7 @@ void
Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
StartCallbackPtr startCB;
- vector<OutgoingAsyncMessageCallbackPtr> sentCBs;
+ vector<SentCallback> sentCBs;
Byte compress = 0;
Int requestId = 0;
Int invokeNum = 0;
@@ -1558,7 +1558,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
void
-ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs,
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
{
@@ -1574,9 +1574,20 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn
//
// Notify AMI calls that the message was sent.
//
- for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
+ for(vector<SentCallback>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
- (*p)->__sent();
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ if(p->outAsync)
+ {
+ p->outAsync->__sent();
+ }
+ if(p->replyOutAsync)
+ {
+ p->replyOutAsync->__finished();
+ }
+#else
+ p->outAsync->__sent();
+#endif
}
//
@@ -1585,7 +1596,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyn
//
if(outAsync)
{
- outAsync->__finished(stream);
+ outAsync->__finished();
}
//
@@ -2380,7 +2391,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
}
void
-Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callbacks)
+Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
{
assert(!_sendStreams.empty());
assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end());
@@ -2393,11 +2404,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ bool sentCB = message->sent(this, true);
+ if(sentCB || message->replyOutAsync)
+ {
+ if(sentCB)
+ {
+ callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync));
+ }
+ else
+ {
+ callbacks.push_back(SentCallback(0, message->replyOutAsync));
+ }
+ }
+#else
if(message->sent(this, true))
{
- assert(message->outAsync);
- callbacks.push_back(message->outAsync);
+ callbacks.push_back(SentCallback(message->outAsync));
}
+#endif
_sendStreams.pop_front();
//
@@ -2991,6 +3016,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
_asyncRequests.erase(q);
}
+
+ stream.swap(*outAsync->__getIs());
+
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ //
+ // If we just received the reply of a request which isn't acknowledge as
+ // sent yet, we queue the reply instead of processing it right away. It
+ // will be processed once the write callback is invoked for the message.
+ //
+ OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front();
+ if(message && message->outAsync.get() == outAsync.get())
+ {
+ swap(message->replyOutAsync, outAsync);
+ }
+#endif
+
}
notifyAll(); // Notify threads blocked in close(false)
break;
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index bf60d039ef1..21d692d7536 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -94,6 +94,26 @@ public:
};
typedef IceUtil::Handle<StartCallback> StartCallbackPtr;
+ struct SentCallback
+ {
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync,
+ const IceInternal::OutgoingAsyncPtr& replyOutAsync) :
+ outAsync(outAsync), replyOutAsync(replyOutAsync)
+ {
+ }
+#else
+ SentCallback(const IceInternal::OutgoingAsyncMessageCallbackPtr& outAsync) : outAsync(outAsync)
+ {
+ }
+#endif
+
+ IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ IceInternal::OutgoingAsyncPtr replyOutAsync;
+#endif
+ };
+
enum DestructionReason
{
ObjectAdapterDeactivated,
@@ -178,9 +198,9 @@ public:
void exception(const LocalException&);
void invokeException(const LocalException&, int);
- void dispatch(const StartCallbackPtr&, const std::vector<IceInternal::OutgoingAsyncMessageCallbackPtr>&,
- Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&,
- const IceInternal::OutgoingAsyncPtr&, IceInternal::BasicStream&);
+ void dispatch(const StartCallbackPtr&, const std::vector<SentCallback>&, Byte, Int, Int,
+ const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&,
+ IceInternal::BasicStream&);
void finish();
private:
@@ -223,6 +243,9 @@ private:
IceInternal::BasicStream* stream;
IceInternal::OutgoingMessageCallback* out;
IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ IceInternal::OutgoingAsyncPtr replyOutAsync;
+#endif
bool compress;
int requestId;
bool adopted;
@@ -244,7 +267,7 @@ private:
bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
- void sendNextMessage(std::vector<IceInternal::OutgoingAsyncMessageCallbackPtr>&);
+ void sendNextMessage(std::vector<SentCallback>&);
IceInternal::AsyncStatus sendMessage(OutgoingMessage&);
#ifndef ICE_OS_WINRT
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index aec19bb526b..b558aef973a 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -507,30 +507,23 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
bool alreadySent = _state & Sent; // Expected in case of a retry.
_state |= Sent;
- //
- // It's possible for the request to be done already when using IOCP. This
- // is the case for example if the send callback is dispatched after the
- // read callback for the response/exception.
- //
- if(!(_state & Done))
+ assert(!(_state & Done));
+ if(!_proxy->ice_isTwoway())
{
- if(!_proxy->ice_isTwoway())
+ _remoteObserver.detach();
+ if(!_callback || !_callback->__hasSentCallback())
{
- _remoteObserver.detach();
- if(!_callback || !_callback->__hasSentCallback())
- {
- _observer.detach();
- }
- _state |= Done | OK;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _observer.detach();
}
- else if(connection->timeout() > 0)
- {
- assert(!_timerTaskConnection);
- _timerTaskConnection = connection;
+ _state |= Done | OK;
+ _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ }
+ else if(connection->timeout() > 0)
+ {
+ assert(!_timerTaskConnection);
+ _timerTaskConnection = connection;
IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
_instance->timer()->schedule(this, timeout);
- }
}
_monitor.notifyAll();
return !alreadySent && _callback && _callback->__hasSentCallback();
@@ -610,7 +603,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
}
void
-IceInternal::OutgoingAsync::__finished(BasicStream& is)
+IceInternal::OutgoingAsync::__finished()
{
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
@@ -619,9 +612,11 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get() && !(_state & Done));
+ assert(!_is.b.empty());
+
if(_remoteObserver)
{
- _remoteObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4));
+ _remoteObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
}
_remoteObserver.detach();
@@ -631,7 +626,6 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
_timerTaskConnection = 0;
}
- _is.swap(is);
_is.read(replyStatus);
switch(replyStatus)