diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 88 |
1 files changed, 51 insertions, 37 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 50d922be521..2d5157639e7 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -106,16 +106,23 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) adopted = true; } -void +bool Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify) { + isSent = true; // The message is sent. + if(out) { out->sent(notify); // true = notify the waiting thread that the request was sent. + return false; } else if(outAsync) { - outAsync->__sent(connection); + return outAsync->__sent(connection); + } + else + { + return false; } if(adopted) @@ -128,21 +135,13 @@ Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify) void Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) { - if(!response) + if(out) { - // - // Only notify oneway requests. The connection keeps track of twoway - // requests in the _requests/_asyncRequests maps and will notify them - // of the connection exceptions. - // - if(out) - { - out->finished(ex); - } - else if(outAsync) - { - outAsync->__finished(ex); - } + out->finished(ex, isSent); + } + else if(outAsync) + { + outAsync->__finished(ex, isSent); } if(adopted) @@ -429,7 +428,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); - Int requestId; + Int requestId = 0; if(response) { // @@ -460,7 +459,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) bool sent = false; try { - OutgoingMessage message(out, os, compress, response); + OutgoingMessage message(out, os, compress, requestId); sent = sendMessage(message); } catch(const LocalException& ex) @@ -506,7 +505,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); - Int requestId; + Int requestId = 0; if(response) { // @@ -533,7 +532,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b bool sent = false; try { - OutgoingMessage message(out, os, compress, response); + OutgoingMessage message(out, os, compress, requestId); sent = sendMessage(message); } catch(const LocalException& ex) @@ -798,7 +797,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) bool sent = false; try { - OutgoingMessage message(out, out->os(), _batchRequestCompress, false); + OutgoingMessage message(out, out->os(), _batchRequestCompress, 0); sent = sendMessage(message); } catch(const Ice::LocalException& ex) @@ -856,7 +855,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) bool sent = false; try { - OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false); + OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0); sent = sendMessage(message); } catch(const Ice::LocalException& ex) @@ -1030,7 +1029,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - _transceiver->startWrite(_writeStream); + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) + { + // The whole message is written, assume it's sent now for at-most-once semantics. + _sendStreams.front().isSent = true; + } } else if(operation & SocketOperationRead) { @@ -1058,7 +1061,6 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { _transceiver->finishRead(_readStream); } - } catch(const Ice::LocalException& ex) { @@ -1300,7 +1302,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { - (*p)->__sentCallback(_instance); + (*p)->__sent(); } // @@ -1351,21 +1353,22 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) if(!_sendStreams.empty()) { assert(!_writeStream.b.empty()); - #ifdef ICE_USE_IOCP // - // The current message might be sent but not yet removed from _sendStreams if the - // connection was closed shortly after. We check if that's the case here and mark - // the message as sent if necessary. + // The current message might be sent but not yet removed from _sendStreams. If + // the response has been received in the meantime, we remove the message from + // _sendStreams to not call finished on a message which is already done. // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - if(message->stream->i == message->stream->b.end()) + if(message->requestId > 0 && + (message->out && _requests.find(message->requestId) == _requests.end() || + message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end())) { - message->sent(this, true); - if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get())) + if(message->sent(this, true)) { - message->outAsync->__sentCallback(_instance); + assert(message->outAsync); + message->outAsync->__sent(); } _sendStreams.pop_front(); } @@ -1374,19 +1377,30 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { o->finished(*_exception.get()); + if(o->requestId) // Make sure finished isn't called twice. + { + if(o->out) + { + _requests.erase(o->requestId); + } + else + { + _asyncRequests.erase(o->requestId); + } + } } _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get()); + p->second->finished(*_exception.get(), true); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); + q->second->__finished(*_exception.get(), true); } _asyncRequests.clear(); @@ -2007,9 +2021,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - message->sent(this, true); - if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get())) + if(message->sent(this, true)) { + assert(message->outAsync); callbacks.push_back(message->outAsync); } _sendStreams.pop_front(); |