summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp88
1 files changed, 51 insertions, 37 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 50d922be521..2d5157639e7 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -106,16 +106,23 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
adopted = true;
}
-void
+bool
Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
{
+ isSent = true; // The message is sent.
+
if(out)
{
out->sent(notify); // true = notify the waiting thread that the request was sent.
+ return false;
}
else if(outAsync)
{
- outAsync->__sent(connection);
+ return outAsync->__sent(connection);
+ }
+ else
+ {
+ return false;
}
if(adopted)
@@ -128,21 +135,13 @@ Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
void
Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
{
- if(!response)
+ if(out)
{
- //
- // Only notify oneway requests. The connection keeps track of twoway
- // requests in the _requests/_asyncRequests maps and will notify them
- // of the connection exceptions.
- //
- if(out)
- {
- out->finished(ex);
- }
- else if(outAsync)
- {
- outAsync->__finished(ex);
- }
+ out->finished(ex, isSent);
+ }
+ else if(outAsync)
+ {
+ outAsync->__finished(ex, isSent);
}
if(adopted)
@@ -429,7 +428,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
//
_transceiver->checkSendSize(*os, _instance->messageSizeMax());
- Int requestId;
+ Int requestId = 0;
if(response)
{
//
@@ -460,7 +459,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
bool sent = false;
try
{
- OutgoingMessage message(out, os, compress, response);
+ OutgoingMessage message(out, os, compress, requestId);
sent = sendMessage(message);
}
catch(const LocalException& ex)
@@ -506,7 +505,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
//
_transceiver->checkSendSize(*os, _instance->messageSizeMax());
- Int requestId;
+ Int requestId = 0;
if(response)
{
//
@@ -533,7 +532,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
bool sent = false;
try
{
- OutgoingMessage message(out, os, compress, response);
+ OutgoingMessage message(out, os, compress, requestId);
sent = sendMessage(message);
}
catch(const LocalException& ex)
@@ -798,7 +797,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
bool sent = false;
try
{
- OutgoingMessage message(out, out->os(), _batchRequestCompress, false);
+ OutgoingMessage message(out, out->os(), _batchRequestCompress, 0);
sent = sendMessage(message);
}
catch(const Ice::LocalException& ex)
@@ -856,7 +855,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
bool sent = false;
try
{
- OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false);
+ OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0);
sent = sendMessage(message);
}
catch(const Ice::LocalException& ex)
@@ -1030,7 +1029,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
- _transceiver->startWrite(_writeStream);
+ if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
+ {
+ // The whole message is written, assume it's sent now for at-most-once semantics.
+ _sendStreams.front().isSent = true;
+ }
}
else if(operation & SocketOperationRead)
{
@@ -1058,7 +1061,6 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
_transceiver->finishRead(_readStream);
}
-
}
catch(const Ice::LocalException& ex)
{
@@ -1300,7 +1302,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
- (*p)->__sentCallback(_instance);
+ (*p)->__sent();
}
//
@@ -1351,21 +1353,22 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
if(!_sendStreams.empty())
{
assert(!_writeStream.b.empty());
-
#ifdef ICE_USE_IOCP
//
- // The current message might be sent but not yet removed from _sendStreams if the
- // connection was closed shortly after. We check if that's the case here and mark
- // the message as sent if necessary.
+ // The current message might be sent but not yet removed from _sendStreams. If
+ // the response has been received in the meantime, we remove the message from
+ // _sendStreams to not call finished on a message which is already done.
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
- if(message->stream->i == message->stream->b.end())
+ if(message->requestId > 0 &&
+ (message->out && _requests.find(message->requestId) == _requests.end() ||
+ message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))
{
- message->sent(this, true);
- if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get()))
+ if(message->sent(this, true))
{
- message->outAsync->__sentCallback(_instance);
+ assert(message->outAsync);
+ message->outAsync->__sent();
}
_sendStreams.pop_front();
}
@@ -1374,19 +1377,30 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
o->finished(*_exception.get());
+ if(o->requestId) // Make sure finished isn't called twice.
+ {
+ if(o->out)
+ {
+ _requests.erase(o->requestId);
+ }
+ else
+ {
+ _asyncRequests.erase(o->requestId);
+ }
+ }
}
_sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
}
for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- p->second->finished(*_exception.get());
+ p->second->finished(*_exception.get(), true);
}
_requests.clear();
for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get());
+ q->second->__finished(*_exception.get(), true);
}
_asyncRequests.clear();
@@ -2007,9 +2021,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
- message->sent(this, true);
- if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get()))
+ if(message->sent(this, true))
{
+ assert(message->outAsync);
callbacks.push_back(message->outAsync);
}
_sendStreams.pop_front();