diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 271 |
1 files changed, 16 insertions, 255 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7af8fad5996..170c7b95d1e 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -20,7 +20,6 @@ #include <Ice/ACM.h> #include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager(). #include <Ice/EndpointI.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> @@ -229,7 +228,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str) } else if(!str) { - if(out || outAsync) + if(outAsync) { return; // Adopting request stream is not necessary. } @@ -249,8 +248,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str) void Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { - assert((out || outAsync)); // Only requests can timeout. - out = 0; + assert(outAsync); // Only requests can timeout. outAsync = 0; if(adoptStream) { @@ -271,11 +269,7 @@ Ice::ConnectionI::OutgoingMessage::sent() } stream = 0; - if(out) - { - out->sent(); - } - else if(outAsync) + if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) invokeSent = outAsync->sent(); @@ -290,11 +284,7 @@ Ice::ConnectionI::OutgoingMessage::sent() void Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { - if(out) - { - out->completed(ex); - } - else if(outAsync) + if(outAsync) { if(outAsync->exception(ex)) { @@ -436,7 +426,7 @@ Ice::ConnectionI::close(bool force) // requests to be retried, regardless of whether the server // has processed them or not. // - while(!_requests.empty() || !_asyncRequests.empty()) + while(!_asyncRequests.empty()) { wait(); } @@ -591,8 +581,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - if(acm.close == CloseOnIdleForceful || - (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) + if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && !_asyncRequests.empty())) { // // Close the connection if we didn't receive a heartbeat in @@ -600,8 +589,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__)); } - else if(acm.close != CloseOnInvocation && - _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty()) + else if(acm.close != CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() && + _asyncRequests.empty()) { // // The connection is idle, close it. @@ -611,94 +600,6 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) } } -bool -Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum) -{ - OutputStream* os = out->os(); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - if(_exception) - { - throw RetryException(*_exception); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Ensure the message isn't bigger than what we can send with the - // transport. - // - _transceiver->checkSendSize(*os); - - Int requestId = 0; - if(response) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - else if(batchRequestNum > 0) - { - const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); - - // - // Send the message. If it can't be sent without blocking the message is added - // to _sendStreams and it will be sent by the selector thread. - // - bool sent = false; - try - { - OutgoingMessage message(out, os, compress, requestId); - sent = sendMessage(message) & AsyncStatusSent; - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception); - _exception->ice_throw(); - } - - if(response) - { - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out)); - } - - return sent; -} - AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum) { @@ -824,8 +725,7 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ void Ice::ConnectionI::flushBatchRequests() { - ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); - out.invoke(); + end_flushBatchRequests(begin_flushBatchRequests()); } AsyncResultPtr @@ -1021,96 +921,6 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) - { - return; // The request has already been or will be shortly notified of the failure. - } - - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) - { - if(o->out == out) - { - if(o->requestId) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - else - { - _requests.erase(o->requestId); - } - } - - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) - { - o->canceled(true); // true = adopt the stream. - } - else - { - o->canceled(false); - _sendStreams.erase(o); - } - out->completed(ex); - } - return; - } - } - - if(dynamic_cast<Outgoing*>(out)) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - out->completed(ex); - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - return; - } - else - { - for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - p->second->completed(ex); - assert(p != _requestsHint); - _requests.erase(p); - } - return; // We're done. - } - } - } - } -} - -void Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { // @@ -2018,28 +1828,13 @@ Ice::ConnectionI::finish(bool close) o->completed(*_exception); if(o->requestId) // Make sure finished isn't called twice. { - if(o->out) - { - _requests.erase(o->requestId); - } - else - { - _asyncRequests.erase(o->requestId); - } + _asyncRequests.erase(o->requestId); } } - _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage + _sendStreams.clear(); } - - for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->completed(*_exception); - } - - _requests.clear(); - for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { if(q->second->exception(*_exception)) @@ -2181,7 +1976,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0), _compressionLevel(1), _nextRequestId(1), - _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), _messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()), _batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())), @@ -2249,7 +2043,6 @@ Ice::ConnectionI::~ConnectionI() assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); - assert(_requests.empty()); assert(_asyncRequests.empty()); } @@ -3310,54 +3103,22 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request stream.read(requestId); - map<Int, OutgoingBase*>::iterator p = _requests.end(); map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end(); - if(_requestsHint != _requests.end()) + if(_asyncRequestsHint != _asyncRequests.end()) { - if(_requestsHint->first == requestId) + if(_asyncRequestsHint->first == requestId) { - p = _requestsHint; + q = _asyncRequestsHint; } } - if(p == _requests.end()) - { - if(_asyncRequestsHint != _asyncRequests.end()) - { - if(_asyncRequestsHint->first == requestId) - { - q = _asyncRequestsHint; - } - } - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end() && q == _asyncRequests.end()) + if(q == _asyncRequests.end()) { q = _asyncRequests.find(requestId); } - if(p != _requests.end()) - { - p->second->completed(stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - notifyAll(); // Notify threads blocked in close(false) - } - else if(q != _asyncRequests.end()) + if(q != _asyncRequests.end()) { outAsync = q->second; |