summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
committerBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
commit5a57686d2ccbb085b8ac67908ad9525a6bafaf4b (patch)
tree58c2219412e8af66fbfd66d5269af6b7a48b28d2 /cpp
parentAvoid check_output isn't supported with python 2.6 (diff)
downloadice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.bz2
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.xz
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.zip
Fixed ICE-6199 - changed collocation optimization to call AMI cb asynchronously if AMD dispatch
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Ice/Incoming.h6
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp44
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.h8
-rw-r--r--cpp/src/Ice/ConnectionI.cpp58
-rw-r--r--cpp/src/Ice/ConnectionI.h6
-rw-r--r--cpp/src/Ice/Incoming.cpp40
-rw-r--r--cpp/src/Ice/IncomingAsync.cpp18
-rw-r--r--cpp/src/Ice/ResponseHandler.h6
8 files changed, 105 insertions, 81 deletions
diff --git a/cpp/include/Ice/Incoming.h b/cpp/include/Ice/Incoming.h
index f33f1a41640..7f1a1cce947 100644
--- a/cpp/include/Ice/Incoming.h
+++ b/cpp/include/Ice/Incoming.h
@@ -47,10 +47,10 @@ protected:
void __warning(const Ice::Exception&) const;
void __warning(const std::string&) const;
- bool __servantLocatorFinished();
+ bool __servantLocatorFinished(bool);
- void __handleException(const std::exception&);
- void __handleException();
+ void __handleException(const std::exception&, bool);
+ void __handleException(bool);
Ice::Current _current;
Ice::ObjectPtr _servant;
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 8ec91cdd620..deb9fefb74d 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -495,7 +495,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
}
void
-CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
+CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd)
{
OutgoingAsyncPtr outAsync;
{
@@ -533,7 +533,19 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
if(outAsync)
{
- outAsync->invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync->invokeCompleted();
+ }
}
_adapter->decDirectCount();
@@ -546,17 +558,17 @@ CollocatedRequestHandler::sendNoResponse()
}
bool
-CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex)
+CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter->decDirectCount();
return true;
}
void
-CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum)
+CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter->decDirectCount();
}
@@ -640,7 +652,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const ObjectAdapterDeactivatedException& ex)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, false);
return;
}
@@ -651,12 +663,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const LocalException& ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
}
void
-CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
+CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool amd)
{
if(requestId == 0)
{
@@ -689,6 +701,18 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
if(outAsync)
{
- outAsync->invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync->invokeCompleted();
+ }
}
}
diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h
index d2d8c5bab01..1075af2f119 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.h
+++ b/cpp/src/Ice/CollocatedRequestHandler.h
@@ -56,10 +56,10 @@ public:
virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
- virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte);
+ virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool);
virtual void sendNoResponse();
- virtual bool systemException(Ice::Int, const Ice::SystemException&);
- virtual void invokeException(Ice::Int, const Ice::LocalException&, int);
+ virtual bool systemException(Ice::Int, const Ice::SystemException&, bool);
+ virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool);
const ReferencePtr& getReference() const { return _reference; } // Inlined for performances.
@@ -78,7 +78,7 @@ public:
private:
- void handleException(Ice::Int, const Ice::Exception&);
+ void handleException(Ice::Int, const Ice::Exception&, bool);
const Ice::ObjectAdapterIPtr _adapter;
const bool _dispatcher;
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index ddcbea13f11..e583913e5a0 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1477,7 +1477,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
void
-Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag)
+Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*amd*/)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
@@ -1550,11 +1550,37 @@ Ice::ConnectionI::sendNoResponse()
}
bool
-Ice::ConnectionI::systemException(Int, const SystemException&)
+Ice::ConnectionI::systemException(Int, const SystemException&, bool /*amd*/)
{
return false; // System exceptions aren't marshalled.
}
+void
+Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum, bool /*amd*/)
+{
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert(_dispatchCount >= invokeNum);
+ _dispatchCount -= invokeNum;
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+ }
+}
+
EndpointIPtr
Ice::ConnectionI::endpoint() const
{
@@ -2359,32 +2385,6 @@ Ice::ConnectionI::exception(const LocalException& ex)
setState(StateClosed, ex);
}
-void
-Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum)
-{
- //
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- assert(_dispatchCount >= invokeNum);
- _dispatchCount -= invokeNum;
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
-}
-
Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const ACMMonitorPtr& monitor,
@@ -3708,7 +3708,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
catch(const LocalException& ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 003ef41c519..4096ce5a2b4 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -192,9 +192,10 @@ public:
virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&);
virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&);
- virtual void sendResponse(Int, IceInternal::BasicStream*, Byte);
+ virtual void sendResponse(Int, IceInternal::BasicStream*, Byte, bool);
virtual void sendNoResponse();
- virtual bool systemException(Int, const SystemException&);
+ virtual bool systemException(Int, const SystemException&, bool);
+ virtual void invokeException(Ice::Int, const LocalException&, int, bool);
IceInternal::EndpointIPtr endpoint() const;
IceInternal::ConnectorPtr connector() const;
@@ -224,7 +225,6 @@ public:
virtual ConnectionInfoPtr getInfo() const; // From Connection
void exception(const LocalException&);
- virtual void invokeException(Ice::Int, const LocalException&, int);
void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&,
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index b2cc6cbaef4..160549421ee 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -207,7 +207,7 @@ IceInternal::IncomingBase::__warning(const string& msg) const
}
bool
-IceInternal::IncomingBase::__servantLocatorFinished()
+IceInternal::IncomingBase::__servantLocatorFinished(bool amd)
{
assert(_locator && _servant);
try
@@ -232,7 +232,7 @@ IceInternal::IncomingBase::__servantLocatorFinished()
_os.write(ex);
_os.endWriteEncaps();
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -244,23 +244,23 @@ IceInternal::IncomingBase::__servantLocatorFinished()
}
catch(const std::exception& ex)
{
- __handleException(ex);
+ __handleException(ex, amd);
}
catch(...)
{
- __handleException();
+ __handleException(amd);
}
return false;
}
void
-IceInternal::IncomingBase::__handleException(const std::exception& exc)
+IceInternal::IncomingBase::__handleException(const std::exception& exc, bool amd)
{
assert(_responseHandler);
if(const SystemException* ex = dynamic_cast<const SystemException*>(&exc))
{
- if(_responseHandler->systemException(_current.requestId, *ex))
+ if(_responseHandler->systemException(_current.requestId, *ex, amd))
{
return;
}
@@ -333,7 +333,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(rfe->operation, false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -405,7 +405,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
}
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -433,7 +433,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(str.str(), false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -446,7 +446,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
}
void
-IceInternal::IncomingBase::__handleException()
+IceInternal::IncomingBase::__handleException(bool amd)
{
if(_os.instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0)
{
@@ -467,7 +467,7 @@ IceInternal::IncomingBase::__handleException()
string reason = "unknown c++ exception";
_os.write(reason, false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -648,7 +648,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_os.write(ex);
_os.endWriteEncaps();
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, false);
}
else
{
@@ -662,13 +662,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
catch(const std::exception& ex)
{
_is->skipEncaps(); // Required for batch requests.
- __handleException(ex);
+ __handleException(ex, false);
return;
}
catch(...)
{
_is->skipEncaps(); // Required for batch requests.
- __handleException();
+ __handleException(false);
return;
}
}
@@ -688,7 +688,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
return;
}
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(false))
{
return;
}
@@ -713,20 +713,20 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
}
catch(const std::exception& ex)
{
- if(_servant && _locator && !__servantLocatorFinished())
+ if(_servant && _locator && !__servantLocatorFinished(false))
{
return;
}
- __handleException(ex);
+ __handleException(ex, false);
return;
}
catch(...)
{
- if(_servant && _locator && !__servantLocatorFinished())
+ if(_servant && _locator && !__servantLocatorFinished(false))
{
return;
}
- __handleException();
+ __handleException(false);
return;
}
@@ -739,7 +739,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
if(_response)
{
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, false);
}
else
{
diff --git a/cpp/src/Ice/IncomingAsync.cpp b/cpp/src/Ice/IncomingAsync.cpp
index 10c0ec6ac4e..7b56ab244fc 100644
--- a/cpp/src/Ice/IncomingAsync.cpp
+++ b/cpp/src/Ice/IncomingAsync.cpp
@@ -191,7 +191,7 @@ IceInternal::IncomingAsync::__response()
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
@@ -201,7 +201,7 @@ IceInternal::IncomingAsync::__response()
if(_response)
{
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, true);
}
else
{
@@ -213,7 +213,7 @@ IceInternal::IncomingAsync::__response()
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
@@ -222,16 +222,16 @@ IceInternal::IncomingAsync::__exception(const std::exception& exc)
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
- __handleException(exc);
+ __handleException(exc, true);
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
@@ -240,16 +240,16 @@ IceInternal::IncomingAsync::__exception()
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
- __handleException();
+ __handleException(true);
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
diff --git a/cpp/src/Ice/ResponseHandler.h b/cpp/src/Ice/ResponseHandler.h
index 9d9c212bec8..73e206a152b 100644
--- a/cpp/src/Ice/ResponseHandler.h
+++ b/cpp/src/Ice/ResponseHandler.h
@@ -28,10 +28,10 @@ public:
virtual ~ResponseHandler();
- virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte) = 0;
+ virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool) = 0;
virtual void sendNoResponse() = 0;
- virtual bool systemException(Ice::Int, const Ice::SystemException&) = 0;
- virtual void invokeException(Ice::Int, const Ice::LocalException&, int) = 0;
+ virtual bool systemException(Ice::Int, const Ice::SystemException&, bool) = 0;
+ virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool) = 0;
};
}