diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-12-17 18:06:54 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-12-17 18:06:54 +0100 |
commit | 5a57686d2ccbb085b8ac67908ad9525a6bafaf4b (patch) | |
tree | 58c2219412e8af66fbfd66d5269af6b7a48b28d2 /cpp | |
parent | Avoid check_output isn't supported with python 2.6 (diff) | |
download | ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.bz2 ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.xz ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.zip |
Fixed ICE-6199 - changed collocation optimization to call AMI cb asynchronously if AMD dispatch
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/Ice/Incoming.h | 6 | ||||
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 44 | ||||
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.h | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 58 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 6 | ||||
-rw-r--r-- | cpp/src/Ice/Incoming.cpp | 40 | ||||
-rw-r--r-- | cpp/src/Ice/IncomingAsync.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/ResponseHandler.h | 6 |
8 files changed, 105 insertions, 81 deletions
diff --git a/cpp/include/Ice/Incoming.h b/cpp/include/Ice/Incoming.h index f33f1a41640..7f1a1cce947 100644 --- a/cpp/include/Ice/Incoming.h +++ b/cpp/include/Ice/Incoming.h @@ -47,10 +47,10 @@ protected: void __warning(const Ice::Exception&) const; void __warning(const std::string&) const; - bool __servantLocatorFinished(); + bool __servantLocatorFinished(bool); - void __handleException(const std::exception&); - void __handleException(); + void __handleException(const std::exception&, bool); + void __handleException(bool); Ice::Current _current; Ice::ObjectPtr _servant; diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 8ec91cdd620..deb9fefb74d 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -495,7 +495,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) } void -CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) +CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd) { OutgoingAsyncPtr outAsync; { @@ -533,7 +533,19 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) if(outAsync) { - outAsync->invokeCompleted(); + // + // If called from an AMD dispatch, invoke asynchronously + // the completion callback since this might be called from + // the user code. + // + if(amd) + { + outAsync->invokeCompletedAsync(); + } + else + { + outAsync->invokeCompleted(); + } } _adapter->decDirectCount(); @@ -546,17 +558,17 @@ CollocatedRequestHandler::sendNoResponse() } bool -CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex) +CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex, bool amd) { - handleException(requestId, ex); + handleException(requestId, ex, amd); _adapter->decDirectCount(); return true; } void -CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum) +CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum, bool amd) { - handleException(requestId, ex); + handleException(requestId, ex, amd); _adapter->decDirectCount(); } @@ -640,7 +652,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu } catch(const ObjectAdapterDeactivatedException& ex) { - handleException(requestId, ex); + handleException(requestId, ex, false); return; } @@ -651,12 +663,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu } catch(const LocalException& ex) { - invokeException(requestId, ex, invokeNum); // Fatal invocation exception + invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception } } void -CollocatedRequestHandler::handleException(int requestId, const Exception& ex) +CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool amd) { if(requestId == 0) { @@ -689,6 +701,18 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) if(outAsync) { - outAsync->invokeCompleted(); + // + // If called from an AMD dispatch, invoke asynchronously + // the completion callback since this might be called from + // the user code. + // + if(amd) + { + outAsync->invokeCompletedAsync(); + } + else + { + outAsync->invokeCompleted(); + } } } diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index d2d8c5bab01..1075af2f119 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -56,10 +56,10 @@ public: virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); - virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte); + virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool); virtual void sendNoResponse(); - virtual bool systemException(Ice::Int, const Ice::SystemException&); - virtual void invokeException(Ice::Int, const Ice::LocalException&, int); + virtual bool systemException(Ice::Int, const Ice::SystemException&, bool); + virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool); const ReferencePtr& getReference() const { return _reference; } // Inlined for performances. @@ -78,7 +78,7 @@ public: private: - void handleException(Ice::Int, const Ice::Exception&); + void handleException(Ice::Int, const Ice::Exception&, bool); const Ice::ObjectAdapterIPtr _adapter; const bool _dispatcher; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index ddcbea13f11..e583913e5a0 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1477,7 +1477,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } void -Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) +Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*amd*/) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); @@ -1550,11 +1550,37 @@ Ice::ConnectionI::sendNoResponse() } bool -Ice::ConnectionI::systemException(Int, const SystemException&) +Ice::ConnectionI::systemException(Int, const SystemException&, bool /*amd*/) { return false; // System exceptions aren't marshalled. } +void +Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum, bool /*amd*/) +{ + // + // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't + // called in case of a fatal exception we decrement _dispatchCount here. + // + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + + if(invokeNum > 0) + { + assert(_dispatchCount >= invokeNum); + _dispatchCount -= invokeNum; + if(_dispatchCount == 0) + { + if(_state == StateFinished) + { + reap(); + } + notifyAll(); + } + } +} + EndpointIPtr Ice::ConnectionI::endpoint() const { @@ -2359,32 +2385,6 @@ Ice::ConnectionI::exception(const LocalException& ex) setState(StateClosed, ex); } -void -Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum) -{ - // - // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't - // called in case of a fatal exception we decrement _dispatchCount here. - // - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - - if(invokeNum > 0) - { - assert(_dispatchCount >= invokeNum); - _dispatchCount -= invokeNum; - if(_dispatchCount == 0) - { - if(_state == StateFinished) - { - reap(); - } - notifyAll(); - } - } -} - Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const InstancePtr& instance, const ACMMonitorPtr& monitor, @@ -3708,7 +3708,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B } catch(const LocalException& ex) { - invokeException(requestId, ex, invokeNum); // Fatal invocation exception + invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception } } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 003ef41c519..4096ce5a2b4 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -192,9 +192,10 @@ public: virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&); virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&); - virtual void sendResponse(Int, IceInternal::BasicStream*, Byte); + virtual void sendResponse(Int, IceInternal::BasicStream*, Byte, bool); virtual void sendNoResponse(); - virtual bool systemException(Int, const SystemException&); + virtual bool systemException(Int, const SystemException&, bool); + virtual void invokeException(Ice::Int, const LocalException&, int, bool); IceInternal::EndpointIPtr endpoint() const; IceInternal::ConnectorPtr connector() const; @@ -224,7 +225,6 @@ public: virtual ConnectionInfoPtr getInfo() const; // From Connection void exception(const LocalException&); - virtual void invokeException(Ice::Int, const LocalException&, int); void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index b2cc6cbaef4..160549421ee 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -207,7 +207,7 @@ IceInternal::IncomingBase::__warning(const string& msg) const } bool -IceInternal::IncomingBase::__servantLocatorFinished() +IceInternal::IncomingBase::__servantLocatorFinished(bool amd) { assert(_locator && _servant); try @@ -232,7 +232,7 @@ IceInternal::IncomingBase::__servantLocatorFinished() _os.write(ex); _os.endWriteEncaps(); _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd); } else { @@ -244,23 +244,23 @@ IceInternal::IncomingBase::__servantLocatorFinished() } catch(const std::exception& ex) { - __handleException(ex); + __handleException(ex, amd); } catch(...) { - __handleException(); + __handleException(amd); } return false; } void -IceInternal::IncomingBase::__handleException(const std::exception& exc) +IceInternal::IncomingBase::__handleException(const std::exception& exc, bool amd) { assert(_responseHandler); if(const SystemException* ex = dynamic_cast<const SystemException*>(&exc)) { - if(_responseHandler->systemException(_current.requestId, *ex)) + if(_responseHandler->systemException(_current.requestId, *ex, amd)) { return; } @@ -333,7 +333,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) _os.write(rfe->operation, false); _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd); } else { @@ -405,7 +405,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) } _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd); } else { @@ -433,7 +433,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) _os.write(str.str(), false); _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd); } else { @@ -446,7 +446,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) } void -IceInternal::IncomingBase::__handleException() +IceInternal::IncomingBase::__handleException(bool amd) { if(_os.instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) { @@ -467,7 +467,7 @@ IceInternal::IncomingBase::__handleException() string reason = "unknown c++ exception"; _os.write(reason, false); _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd); } else { @@ -648,7 +648,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _os.write(ex); _os.endWriteEncaps(); _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, false); } else { @@ -662,13 +662,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre catch(const std::exception& ex) { _is->skipEncaps(); // Required for batch requests. - __handleException(ex); + __handleException(ex, false); return; } catch(...) { _is->skipEncaps(); // Required for batch requests. - __handleException(); + __handleException(false); return; } } @@ -688,7 +688,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre return; } - if(_locator && !__servantLocatorFinished()) + if(_locator && !__servantLocatorFinished(false)) { return; } @@ -713,20 +713,20 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre } catch(const std::exception& ex) { - if(_servant && _locator && !__servantLocatorFinished()) + if(_servant && _locator && !__servantLocatorFinished(false)) { return; } - __handleException(ex); + __handleException(ex, false); return; } catch(...) { - if(_servant && _locator && !__servantLocatorFinished()) + if(_servant && _locator && !__servantLocatorFinished(false)) { return; } - __handleException(); + __handleException(false); return; } @@ -739,7 +739,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre if(_response) { _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, false); } else { diff --git a/cpp/src/Ice/IncomingAsync.cpp b/cpp/src/Ice/IncomingAsync.cpp index 10c0ec6ac4e..7b56ab244fc 100644 --- a/cpp/src/Ice/IncomingAsync.cpp +++ b/cpp/src/Ice/IncomingAsync.cpp @@ -191,7 +191,7 @@ IceInternal::IncomingAsync::__response() { try { - if(_locator && !__servantLocatorFinished()) + if(_locator && !__servantLocatorFinished(true)) { return; } @@ -201,7 +201,7 @@ IceInternal::IncomingAsync::__response() if(_response) { _observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4)); - _responseHandler->sendResponse(_current.requestId, &_os, _compress); + _responseHandler->sendResponse(_current.requestId, &_os, _compress, true); } else { @@ -213,7 +213,7 @@ IceInternal::IncomingAsync::__response() } catch(const LocalException& ex) { - _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception + _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception } } @@ -222,16 +222,16 @@ IceInternal::IncomingAsync::__exception(const std::exception& exc) { try { - if(_locator && !__servantLocatorFinished()) + if(_locator && !__servantLocatorFinished(true)) { return; } - __handleException(exc); + __handleException(exc, true); } catch(const LocalException& ex) { - _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception + _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception } } @@ -240,16 +240,16 @@ IceInternal::IncomingAsync::__exception() { try { - if(_locator && !__servantLocatorFinished()) + if(_locator && !__servantLocatorFinished(true)) { return; } - __handleException(); + __handleException(true); } catch(const LocalException& ex) { - _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception + _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception } } diff --git a/cpp/src/Ice/ResponseHandler.h b/cpp/src/Ice/ResponseHandler.h index 9d9c212bec8..73e206a152b 100644 --- a/cpp/src/Ice/ResponseHandler.h +++ b/cpp/src/Ice/ResponseHandler.h @@ -28,10 +28,10 @@ public: virtual ~ResponseHandler(); - virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte) = 0; + virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool) = 0; virtual void sendNoResponse() = 0; - virtual bool systemException(Ice::Int, const Ice::SystemException&) = 0; - virtual void invokeException(Ice::Int, const Ice::LocalException&, int) = 0; + virtual bool systemException(Ice::Int, const Ice::SystemException&, bool) = 0; + virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool) = 0; }; } |