diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/ConnectionI.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 148 |
1 files changed, 85 insertions, 63 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index f36c83f62fa..850f245f163 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -23,6 +23,7 @@ #include <Ice/OutgoingAsync.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> +#include <Ice/RequestHandler.h> // For RetryException #include <Ice/ReferenceFactory.h> // For createProxy(). #include <Ice/ProxyFactory.h> // For createProxy(). #ifndef ICE_OS_WINRT @@ -239,13 +240,21 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) adopted = true; } -void -Ice::ConnectionI::OutgoingMessage::timedOut() +bool +Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) { assert((out || outAsync) && !isSent); // Only requests can timeout. out = 0; outAsync = 0; - adopt(0); // Adopt the request stream + if(adoptStream) + { + adopt(0); // Adopt the request stream + } + else + { + assert(!adopted && !stream); + } + return isSent; } bool @@ -612,7 +621,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) // to send our request, we always try to send the request // again. // - throw LocalExceptionWrapper(*_exception.get(), true); + throw RetryException(*_exception.get()); } assert(_state > StateNotValidated); @@ -692,7 +701,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // to send our request, we always try to send the request // again. // - throw LocalExceptionWrapper(*_exception.get(), true); + throw RetryException(*_exception.get()); } assert(_state > StateNotValidated); @@ -777,7 +786,7 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) // if(_batchStream.b.empty()) { - throw LocalExceptionWrapper(*_exception.get(), true); + throw RetryException(*_exception.get()); } else { @@ -825,7 +834,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) if(_exception.get()) { - _exception->ice_throw(); + return; } bool flush = false; @@ -958,8 +967,7 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name); - BatchOutgoing out(this, _instance.get(), observer); + BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name); out.invoke(); } @@ -1209,17 +1217,19 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // + bool isSent; if(o == _sendStreams.begin()) { - o->timedOut(); + isSent = o->timedOut(true); // true = adopt the stream. } else { + isSent = o->timedOut(false); _sendStreams.erase(o); } InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + out->finished(ex, isSent); return; } } @@ -1254,75 +1264,87 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) void Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + bool isSent; + bool finished = false; { - if(o->outAsync.get() == outAsync.get()) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - if(o->requestId) + if(o->outAsync.get() == outAsync.get()) { - if(_asyncRequestsHint != _asyncRequests.end() && - _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) + if(o->requestId) { - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); + if(_asyncRequestsHint != _asyncRequests.end() && + _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) + { + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + } + else + { + _asyncRequests.erase(o->requestId); + } + } + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + isSent = o->timedOut(true); // true = adopt the stream } else { - _asyncRequests.erase(o->requestId); + isSent = o->timedOut(false); + _sendStreams.erase(o); } + finished = true; + break; // We're done } - - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) - { - o->timedOut(); - } - else - { - _sendStreams.erase(o); - } - - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); - return; // We're done. } - } - OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); - if(o) - { - if(_asyncRequestsHint != _asyncRequests.end()) + if(!finished) { - if(_asyncRequestsHint->second == o) + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); + if(o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->__finished(ex, true); - _asyncRequests.erase(_asyncRequestsHint); - _asyncRequestsHint = _asyncRequests.end(); - } - } + if(_asyncRequestsHint != _asyncRequests.end()) + { + if(_asyncRequestsHint->second == o) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + o->__finished(ex, true); + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + } + } - for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) - { - if(p->second.get() == o.get()) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->__finished(ex, true); - assert(p != _asyncRequestsHint); - _asyncRequests.erase(p); - return; // We're done. + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second.get() == o.get()) + { + assert(p != _asyncRequestsHint); + _asyncRequests.erase(p); + finished = true; + isSent = true; + break; + } + } } } } + + if(finished) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + outAsync->__finished(ex, isSent); + } } void -Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) +Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); @@ -2156,7 +2178,7 @@ Ice::ConnectionI::exception(const LocalException& ex) } void -Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum) +Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum) { // // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't @@ -3458,7 +3480,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B bool response = !_endpoint->datagram() && requestId != 0; assert(!response || invokeNum == 1); - Incoming in(_instance.get(), this, adapter, response, compress, requestId); + Incoming in(_instance.get(), this, this, adapter, response, compress, requestId); // // Dispatch the invocation. @@ -3472,7 +3494,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B } catch(const LocalException& ex) { - invokeException(ex, invokeNum); // Fatal invocation exception + invokeException(requestId, ex, invokeNum); // Fatal invocation exception } } |