summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
committerBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
commit5a57686d2ccbb085b8ac67908ad9525a6bafaf4b (patch)
tree58c2219412e8af66fbfd66d5269af6b7a48b28d2
parentAvoid check_output isn't supported with python 2.6 (diff)
downloadice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.bz2
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.xz
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.zip
Fixed ICE-6199 - changed collocation optimization to call AMI cb asynchronously if AMD dispatch
-rw-r--r--cpp/include/Ice/Incoming.h6
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp44
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.h8
-rw-r--r--cpp/src/Ice/ConnectionI.cpp58
-rw-r--r--cpp/src/Ice/ConnectionI.h6
-rw-r--r--cpp/src/Ice/Incoming.cpp40
-rw-r--r--cpp/src/Ice/IncomingAsync.cpp18
-rw-r--r--cpp/src/Ice/ResponseHandler.h6
-rw-r--r--cs/src/Ice/CollocatedRequestHandler.cs34
-rw-r--r--cs/src/Ice/ConnectionI.cs60
-rw-r--r--cs/src/Ice/Incoming.cs36
-rw-r--r--cs/src/Ice/IncomingAsync.cs12
-rw-r--r--cs/src/Ice/ResponseHandler.cs6
-rw-r--r--java/src/Ice/src/main/java/Ice/ConnectionI.java64
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java48
-rw-r--r--java/src/Ice/src/main/java/IceInternal/Incoming.java12
-rw-r--r--java/src/Ice/src/main/java/IceInternal/IncomingAsync.java12
-rw-r--r--java/src/Ice/src/main/java/IceInternal/IncomingBase.java24
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ResponseHandler.java6
19 files changed, 281 insertions, 219 deletions
diff --git a/cpp/include/Ice/Incoming.h b/cpp/include/Ice/Incoming.h
index f33f1a41640..7f1a1cce947 100644
--- a/cpp/include/Ice/Incoming.h
+++ b/cpp/include/Ice/Incoming.h
@@ -47,10 +47,10 @@ protected:
void __warning(const Ice::Exception&) const;
void __warning(const std::string&) const;
- bool __servantLocatorFinished();
+ bool __servantLocatorFinished(bool);
- void __handleException(const std::exception&);
- void __handleException();
+ void __handleException(const std::exception&, bool);
+ void __handleException(bool);
Ice::Current _current;
Ice::ObjectPtr _servant;
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 8ec91cdd620..deb9fefb74d 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -495,7 +495,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
}
void
-CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
+CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd)
{
OutgoingAsyncPtr outAsync;
{
@@ -533,7 +533,19 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
if(outAsync)
{
- outAsync->invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync->invokeCompleted();
+ }
}
_adapter->decDirectCount();
@@ -546,17 +558,17 @@ CollocatedRequestHandler::sendNoResponse()
}
bool
-CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex)
+CollocatedRequestHandler::systemException(Int requestId, const SystemException& ex, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter->decDirectCount();
return true;
}
void
-CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum)
+CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter->decDirectCount();
}
@@ -640,7 +652,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const ObjectAdapterDeactivatedException& ex)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, false);
return;
}
@@ -651,12 +663,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const LocalException& ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
}
void
-CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
+CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool amd)
{
if(requestId == 0)
{
@@ -689,6 +701,18 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
if(outAsync)
{
- outAsync->invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync->invokeCompleted();
+ }
}
}
diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h
index d2d8c5bab01..1075af2f119 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.h
+++ b/cpp/src/Ice/CollocatedRequestHandler.h
@@ -56,10 +56,10 @@ public:
virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
- virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte);
+ virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool);
virtual void sendNoResponse();
- virtual bool systemException(Ice::Int, const Ice::SystemException&);
- virtual void invokeException(Ice::Int, const Ice::LocalException&, int);
+ virtual bool systemException(Ice::Int, const Ice::SystemException&, bool);
+ virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool);
const ReferencePtr& getReference() const { return _reference; } // Inlined for performances.
@@ -78,7 +78,7 @@ public:
private:
- void handleException(Ice::Int, const Ice::Exception&);
+ void handleException(Ice::Int, const Ice::Exception&, bool);
const Ice::ObjectAdapterIPtr _adapter;
const bool _dispatcher;
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index ddcbea13f11..e583913e5a0 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1477,7 +1477,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
void
-Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag)
+Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*amd*/)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
@@ -1550,11 +1550,37 @@ Ice::ConnectionI::sendNoResponse()
}
bool
-Ice::ConnectionI::systemException(Int, const SystemException&)
+Ice::ConnectionI::systemException(Int, const SystemException&, bool /*amd*/)
{
return false; // System exceptions aren't marshalled.
}
+void
+Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum, bool /*amd*/)
+{
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert(_dispatchCount >= invokeNum);
+ _dispatchCount -= invokeNum;
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+ }
+}
+
EndpointIPtr
Ice::ConnectionI::endpoint() const
{
@@ -2359,32 +2385,6 @@ Ice::ConnectionI::exception(const LocalException& ex)
setState(StateClosed, ex);
}
-void
-Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum)
-{
- //
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- assert(_dispatchCount >= invokeNum);
- _dispatchCount -= invokeNum;
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
-}
-
Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const ACMMonitorPtr& monitor,
@@ -3708,7 +3708,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
catch(const LocalException& ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 003ef41c519..4096ce5a2b4 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -192,9 +192,10 @@ public:
virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&);
virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&);
- virtual void sendResponse(Int, IceInternal::BasicStream*, Byte);
+ virtual void sendResponse(Int, IceInternal::BasicStream*, Byte, bool);
virtual void sendNoResponse();
- virtual bool systemException(Int, const SystemException&);
+ virtual bool systemException(Int, const SystemException&, bool);
+ virtual void invokeException(Ice::Int, const LocalException&, int, bool);
IceInternal::EndpointIPtr endpoint() const;
IceInternal::ConnectorPtr connector() const;
@@ -224,7 +225,6 @@ public:
virtual ConnectionInfoPtr getInfo() const; // From Connection
void exception(const LocalException&);
- virtual void invokeException(Ice::Int, const LocalException&, int);
void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&,
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index b2cc6cbaef4..160549421ee 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -207,7 +207,7 @@ IceInternal::IncomingBase::__warning(const string& msg) const
}
bool
-IceInternal::IncomingBase::__servantLocatorFinished()
+IceInternal::IncomingBase::__servantLocatorFinished(bool amd)
{
assert(_locator && _servant);
try
@@ -232,7 +232,7 @@ IceInternal::IncomingBase::__servantLocatorFinished()
_os.write(ex);
_os.endWriteEncaps();
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -244,23 +244,23 @@ IceInternal::IncomingBase::__servantLocatorFinished()
}
catch(const std::exception& ex)
{
- __handleException(ex);
+ __handleException(ex, amd);
}
catch(...)
{
- __handleException();
+ __handleException(amd);
}
return false;
}
void
-IceInternal::IncomingBase::__handleException(const std::exception& exc)
+IceInternal::IncomingBase::__handleException(const std::exception& exc, bool amd)
{
assert(_responseHandler);
if(const SystemException* ex = dynamic_cast<const SystemException*>(&exc))
{
- if(_responseHandler->systemException(_current.requestId, *ex))
+ if(_responseHandler->systemException(_current.requestId, *ex, amd))
{
return;
}
@@ -333,7 +333,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(rfe->operation, false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -405,7 +405,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
}
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -433,7 +433,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(str.str(), false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -446,7 +446,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
}
void
-IceInternal::IncomingBase::__handleException()
+IceInternal::IncomingBase::__handleException(bool amd)
{
if(_os.instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0)
{
@@ -467,7 +467,7 @@ IceInternal::IncomingBase::__handleException()
string reason = "unknown c++ exception";
_os.write(reason, false);
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, amd);
}
else
{
@@ -648,7 +648,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_os.write(ex);
_os.endWriteEncaps();
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, false);
}
else
{
@@ -662,13 +662,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
catch(const std::exception& ex)
{
_is->skipEncaps(); // Required for batch requests.
- __handleException(ex);
+ __handleException(ex, false);
return;
}
catch(...)
{
_is->skipEncaps(); // Required for batch requests.
- __handleException();
+ __handleException(false);
return;
}
}
@@ -688,7 +688,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
return;
}
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(false))
{
return;
}
@@ -713,20 +713,20 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
}
catch(const std::exception& ex)
{
- if(_servant && _locator && !__servantLocatorFinished())
+ if(_servant && _locator && !__servantLocatorFinished(false))
{
return;
}
- __handleException(ex);
+ __handleException(ex, false);
return;
}
catch(...)
{
- if(_servant && _locator && !__servantLocatorFinished())
+ if(_servant && _locator && !__servantLocatorFinished(false))
{
return;
}
- __handleException();
+ __handleException(false);
return;
}
@@ -739,7 +739,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
if(_response)
{
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, false);
}
else
{
diff --git a/cpp/src/Ice/IncomingAsync.cpp b/cpp/src/Ice/IncomingAsync.cpp
index 10c0ec6ac4e..7b56ab244fc 100644
--- a/cpp/src/Ice/IncomingAsync.cpp
+++ b/cpp/src/Ice/IncomingAsync.cpp
@@ -191,7 +191,7 @@ IceInternal::IncomingAsync::__response()
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
@@ -201,7 +201,7 @@ IceInternal::IncomingAsync::__response()
if(_response)
{
_observer.reply(static_cast<Int>(_os.b.size() - headerSize - 4));
- _responseHandler->sendResponse(_current.requestId, &_os, _compress);
+ _responseHandler->sendResponse(_current.requestId, &_os, _compress, true);
}
else
{
@@ -213,7 +213,7 @@ IceInternal::IncomingAsync::__response()
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
@@ -222,16 +222,16 @@ IceInternal::IncomingAsync::__exception(const std::exception& exc)
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
- __handleException(exc);
+ __handleException(exc, true);
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
@@ -240,16 +240,16 @@ IceInternal::IncomingAsync::__exception()
{
try
{
- if(_locator && !__servantLocatorFinished())
+ if(_locator && !__servantLocatorFinished(true))
{
return;
}
- __handleException();
+ __handleException(true);
}
catch(const LocalException& ex)
{
- _responseHandler->invokeException(_current.requestId, ex, 1); // Fatal invocation exception
+ _responseHandler->invokeException(_current.requestId, ex, 1, true); // Fatal invocation exception
}
}
diff --git a/cpp/src/Ice/ResponseHandler.h b/cpp/src/Ice/ResponseHandler.h
index 9d9c212bec8..73e206a152b 100644
--- a/cpp/src/Ice/ResponseHandler.h
+++ b/cpp/src/Ice/ResponseHandler.h
@@ -28,10 +28,10 @@ public:
virtual ~ResponseHandler();
- virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte) = 0;
+ virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte, bool) = 0;
virtual void sendNoResponse() = 0;
- virtual bool systemException(Ice::Int, const Ice::SystemException&) = 0;
- virtual void invokeException(Ice::Int, const Ice::LocalException&, int) = 0;
+ virtual bool systemException(Ice::Int, const Ice::SystemException&, bool) = 0;
+ virtual void invokeException(Ice::Int, const Ice::LocalException&, int, bool) = 0;
};
}
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs
index 87cd864386b..b3f55231f88 100644
--- a/cs/src/Ice/CollocatedRequestHandler.cs
+++ b/cs/src/Ice/CollocatedRequestHandler.cs
@@ -197,7 +197,7 @@ namespace IceInternal
}
}
- public void sendResponse(int requestId, BasicStream os, byte status)
+ public void sendResponse(int requestId, BasicStream os, byte status, bool amd)
{
Ice.AsyncCallback cb = null;
OutgoingAsync outAsync;
@@ -223,7 +223,14 @@ namespace IceInternal
if(cb != null)
{
- outAsync.invokeCompleted(cb);
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync(cb);
+ }
+ else
+ {
+ outAsync.invokeCompleted(cb);
+ }
}
_adapter.decDirectCount();
}
@@ -235,17 +242,17 @@ namespace IceInternal
}
public bool
- systemException(int requestId, Ice.SystemException ex)
+ systemException(int requestId, Ice.SystemException ex, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
return true;
}
public void
- invokeException(int requestId, Ice.LocalException ex, int invokeNum)
+ invokeException(int requestId, Ice.LocalException ex, int invokeNum, bool amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
}
@@ -441,7 +448,7 @@ namespace IceInternal
}
catch(Ice.ObjectAdapterDeactivatedException ex)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, false);
return;
}
@@ -453,12 +460,12 @@ namespace IceInternal
}
catch(Ice.LocalException ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
}
void
- handleException(int requestId, Ice.Exception ex)
+ handleException(int requestId, Ice.Exception ex, bool amd)
{
if(requestId == 0)
{
@@ -478,7 +485,14 @@ namespace IceInternal
if(cb != null)
{
- outAsync.invokeCompleted(cb);
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync(cb);
+ }
+ else
+ {
+ outAsync.invokeCompleted(cb);
+ }
}
}
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index 9ab4100073e..4afd42c09d2 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -897,7 +897,7 @@ namespace Ice
}
}
- public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
+ public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag, bool amd)
{
lock(this)
{
@@ -969,11 +969,38 @@ namespace Ice
}
}
- public bool systemException(int requestId, Ice.SystemException ex)
+ public bool systemException(int requestId, Ice.SystemException ex, bool amd)
{
return false; // System exceptions aren't marshalled.
}
+ public void invokeException(int requestId, LocalException ex, int invokeNum, bool amd)
+ {
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ lock(this)
+ {
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ Debug.Assert(_dispatchCount >= invokeNum);
+ _dispatchCount -= invokeNum;
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ System.Threading.Monitor.PulseAll(this);
+ }
+ }
+ }
+ }
+
public IceInternal.EndpointI endpoint()
{
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
@@ -1752,33 +1779,6 @@ namespace Ice
}
}
- public void invokeException(int requestId, LocalException ex, int invokeNum)
- {
- //
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- lock(this)
- {
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- Debug.Assert(_dispatchCount >= invokeNum);
- _dispatchCount -= invokeNum;
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- System.Threading.Monitor.PulseAll(this);
- }
- }
- }
- }
-
static ConnectionI()
{
_compressionSupported = IceInternal.BasicStream.compressible();
@@ -2811,7 +2811,7 @@ namespace Ice
}
catch(LocalException ex)
{
- invokeException(requestId, ex, invokeNum);
+ invokeException(requestId, ex, invokeNum, false);
}
finally
{
diff --git a/cs/src/Ice/Incoming.cs b/cs/src/Ice/Incoming.cs
index d3614f49f9d..7443b09036a 100644
--- a/cs/src/Ice/Incoming.cs
+++ b/cs/src/Ice/Incoming.cs
@@ -247,7 +247,7 @@ namespace IceInternal
}
}
- protected bool servantLocatorFinished__()
+ protected bool servantLocatorFinished__(bool amd)
{
Debug.Assert(locator_ != null && servant_ != null);
try
@@ -278,7 +278,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -294,12 +294,12 @@ namespace IceInternal
}
catch(System.Exception ex)
{
- handleException__(ex);
+ handleException__(ex, amd);
}
return false;
}
- protected internal void handleException__(System.Exception exc)
+ protected internal void handleException__(System.Exception exc, bool amd)
{
Debug.Assert(responseHandler_ != null);
@@ -374,7 +374,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -402,7 +402,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -430,7 +430,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -458,7 +458,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -486,7 +486,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -497,7 +497,7 @@ namespace IceInternal
{
if(ex is Ice.SystemException)
{
- if(responseHandler_.systemException(current_.requestId, (Ice.SystemException)ex))
+ if(responseHandler_.systemException(current_.requestId, (Ice.SystemException)ex, amd))
{
return;
}
@@ -522,7 +522,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -550,7 +550,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, amd);
}
else
{
@@ -737,7 +737,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, false);
}
else
{
@@ -755,7 +755,7 @@ namespace IceInternal
catch(System.Exception ex)
{
_is.skipEncaps(); // Required for batch requests.
- handleException__(ex);
+ handleException__(ex, false);
return;
}
}
@@ -778,7 +778,7 @@ namespace IceInternal
return;
}
- if(locator_ != null && !servantLocatorFinished__())
+ if(locator_ != null && !servantLocatorFinished__(false))
{
return;
}
@@ -803,11 +803,11 @@ namespace IceInternal
}
catch(System.Exception ex)
{
- if(servant_ != null && locator_ != null && !servantLocatorFinished__())
+ if(servant_ != null && locator_ != null && !servantLocatorFinished__(false))
{
return;
}
- handleException__(ex);
+ handleException__(ex, false);
return;
}
@@ -825,7 +825,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, false);
}
else
{
diff --git a/cs/src/Ice/IncomingAsync.cs b/cs/src/Ice/IncomingAsync.cs
index 4ca030716b9..b26cf954a0f 100644
--- a/cs/src/Ice/IncomingAsync.cs
+++ b/cs/src/Ice/IncomingAsync.cs
@@ -102,7 +102,7 @@ namespace IceInternal
{
try
{
- if(locator_ != null && !servantLocatorFinished__())
+ if(locator_ != null && !servantLocatorFinished__(true))
{
return;
}
@@ -115,7 +115,7 @@ namespace IceInternal
{
observer_.reply(os_.size() - Protocol.headerSize - 4);
}
- responseHandler_.sendResponse(current_.requestId, os_, compress_);
+ responseHandler_.sendResponse(current_.requestId, os_, compress_, true);
}
else
{
@@ -131,7 +131,7 @@ namespace IceInternal
}
catch(Ice.LocalException ex)
{
- responseHandler_.invokeException(current_.requestId, ex, 1);
+ responseHandler_.invokeException(current_.requestId, ex, 1, true);
}
}
@@ -139,16 +139,16 @@ namespace IceInternal
{
try
{
- if(locator_ != null && !servantLocatorFinished__())
+ if(locator_ != null && !servantLocatorFinished__(true))
{
return;
}
- handleException__(exc);
+ handleException__(exc, true);
}
catch(Ice.LocalException ex)
{
- responseHandler_.invokeException(current_.requestId, ex, 1);
+ responseHandler_.invokeException(current_.requestId, ex, 1, true);
}
}
diff --git a/cs/src/Ice/ResponseHandler.cs b/cs/src/Ice/ResponseHandler.cs
index afcc3dba62e..111b2eb2359 100644
--- a/cs/src/Ice/ResponseHandler.cs
+++ b/cs/src/Ice/ResponseHandler.cs
@@ -14,9 +14,9 @@ namespace IceInternal
{
public interface ResponseHandler
{
- void sendResponse(int requestId, BasicStream os, byte status);
+ void sendResponse(int requestId, BasicStream os, byte status, bool amd);
void sendNoResponse();
- bool systemException(int requestId, Ice.SystemException ex);
- void invokeException(int requestId, Ice.LocalException ex, int invokeNum);
+ bool systemException(int requestId, Ice.SystemException ex, bool amd);
+ void invokeException(int requestId, Ice.LocalException ex, int invokeNum, bool amd);
};
}
diff --git a/java/src/Ice/src/main/java/Ice/ConnectionI.java b/java/src/Ice/src/main/java/Ice/ConnectionI.java
index 5ebcde93cfa..a6a973e72ed 100644
--- a/java/src/Ice/src/main/java/Ice/ConnectionI.java
+++ b/java/src/Ice/src/main/java/Ice/ConnectionI.java
@@ -856,7 +856,7 @@ public final class ConnectionI extends IceInternal.EventHandler
}
@Override
- synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
+ synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag, boolean amd)
{
assert (_state > StateNotValidated);
@@ -923,11 +923,38 @@ public final class ConnectionI extends IceInternal.EventHandler
}
@Override
- public boolean systemException(int requestId, Ice.SystemException ex)
+ public boolean systemException(int requestId, Ice.SystemException ex, boolean amd)
{
return false; // System exceptions aren't marshalled.
}
+ @Override
+ public synchronized void invokeException(int requestId, LocalException ex, int invokeNum, boolean amd)
+ {
+ //
+ // Fatal exception while invoking a request. Since
+ // sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert (_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert (_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+ }
+ }
+
public IceInternal.EndpointI endpoint()
{
return _endpoint; // No mutex protection necessary, _endpoint is
@@ -1609,33 +1636,6 @@ public final class ConnectionI extends IceInternal.EventHandler
setState(StateClosed, ex);
}
- @Override
- public synchronized void invokeException(int requestId, LocalException ex, int invokeNum)
- {
- //
- // Fatal exception while invoking a request. Since
- // sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- assert (_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert (_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
- }
-
public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
IceInternal.Transceiver transceiver, IceInternal.Connector connector,
IceInternal.EndpointI endpoint, ObjectAdapterI adapter)
@@ -2704,7 +2704,7 @@ public final class ConnectionI extends IceInternal.EventHandler
}
catch(LocalException ex)
{
- invokeException(requestId, ex, invokeNum);
+ invokeException(requestId, ex, invokeNum, false);
}
catch(java.lang.AssertionError ex) // Upon assertion, we print the stack
// trace.
@@ -2716,7 +2716,7 @@ public final class ConnectionI extends IceInternal.EventHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
catch(java.lang.OutOfMemoryError ex)
{
@@ -2727,7 +2727,7 @@ public final class ConnectionI extends IceInternal.EventHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
finally
{
diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
index 609cc1b40c6..34e12296bff 100644
--- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
@@ -217,7 +217,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public void
- sendResponse(int requestId, final BasicStream os, byte status)
+ sendResponse(int requestId, final BasicStream os, byte status, boolean amd)
{
OutgoingAsync outAsync = null;
synchronized(this)
@@ -241,7 +241,19 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(outAsync != null)
{
- outAsync.invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync.invokeCompleted();
+ }
}
_adapter.decDirectCount();
}
@@ -255,18 +267,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public boolean
- systemException(int requestId, Ice.SystemException ex)
+ systemException(int requestId, Ice.SystemException ex, boolean amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
return true;
}
@Override
public void
- invokeException(int requestId, Ice.LocalException ex, int invokeNum)
+ invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
}
@@ -444,7 +456,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.ObjectAdapterDeactivatedException ex)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, false);
return;
}
@@ -456,7 +468,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.LocalException ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
{
@@ -467,7 +479,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
catch(java.lang.OutOfMemoryError ex)
{
@@ -478,12 +490,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
}
private void
- handleException(int requestId, Ice.Exception ex)
+ handleException(int requestId, Ice.Exception ex, boolean amd)
{
if(requestId == 0)
{
@@ -502,7 +514,19 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(outAsync != null)
{
- outAsync.invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync.invokeCompleted();
+ }
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/Incoming.java b/java/src/Ice/src/main/java/IceInternal/Incoming.java
index cc9742cd295..70f1ae965ba 100644
--- a/java/src/Ice/src/main/java/IceInternal/Incoming.java
+++ b/java/src/Ice/src/main/java/IceInternal/Incoming.java
@@ -173,7 +173,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, false);
}
else
{
@@ -191,7 +191,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
catch(java.lang.Exception ex)
{
_is.skipEncaps(); // Required for batch requests.
- __handleException(ex);
+ __handleException(ex, false);
return;
}
}
@@ -229,7 +229,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
}
}
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(false))
{
return;
}
@@ -254,11 +254,11 @@ final public class Incoming extends IncomingBase implements Ice.Request
}
catch(java.lang.Exception ex)
{
- if(_servant != null && _locator != null && !__servantLocatorFinished())
+ if(_servant != null && _locator != null && !__servantLocatorFinished(false))
{
return;
}
- __handleException(ex);
+ __handleException(ex, false);
return;
}
@@ -276,7 +276,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, false);
}
else
{
diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java b/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
index 7783f6cac93..e288741c003 100644
--- a/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
+++ b/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
@@ -106,7 +106,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
try
{
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(true))
{
return;
}
@@ -119,7 +119,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, true);
}
else
{
@@ -135,7 +135,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
}
catch(Ice.LocalException ex)
{
- _responseHandler.invokeException(_current.requestId, ex, 1);
+ _responseHandler.invokeException(_current.requestId, ex, 1, true);
}
}
@@ -144,16 +144,16 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
try
{
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(true))
{
return;
}
- __handleException(exc);
+ __handleException(exc, true);
}
catch(Ice.LocalException ex)
{
- _responseHandler.invokeException(_current.requestId, ex, 1);
+ _responseHandler.invokeException(_current.requestId, ex, 1, true);
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
index 6879e8cdf2f..1a2de3dd6ed 100644
--- a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
+++ b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
@@ -262,7 +262,7 @@ class IncomingBase
}
final protected boolean
- __servantLocatorFinished()
+ __servantLocatorFinished(boolean amd)
{
assert(_locator != null && _servant != null);
try
@@ -293,7 +293,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -309,13 +309,13 @@ class IncomingBase
}
catch(java.lang.Exception ex)
{
- __handleException(ex);
+ __handleException(ex, amd);
}
return false;
}
final protected void
- __handleException(java.lang.Exception exc)
+ __handleException(java.lang.Exception exc, boolean amd)
{
assert(_responseHandler != null);
@@ -390,7 +390,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -418,7 +418,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -446,7 +446,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -474,7 +474,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -485,7 +485,7 @@ class IncomingBase
{
if(ex instanceof Ice.SystemException)
{
- if(_responseHandler.systemException(_current.requestId, (Ice.SystemException)ex))
+ if(_responseHandler.systemException(_current.requestId, (Ice.SystemException)ex, amd))
{
return;
}
@@ -516,7 +516,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -550,7 +550,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -583,7 +583,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
diff --git a/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java b/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
index 89449a61945..52c98ffafd7 100644
--- a/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
@@ -11,8 +11,8 @@ package IceInternal;
public interface ResponseHandler
{
- void sendResponse(int requestId, BasicStream os, byte status);
+ void sendResponse(int requestId, BasicStream os, byte status, boolean amd);
void sendNoResponse();
- boolean systemException(int requestId, Ice.SystemException ex);
- void invokeException(int requestId, Ice.LocalException ex, int invokeNum);
+ boolean systemException(int requestId, Ice.SystemException ex, boolean amd);
+ void invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd);
}