summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
committerBenoit Foucher <benoit@zeroc.com>2014-12-17 18:06:54 +0100
commit5a57686d2ccbb085b8ac67908ad9525a6bafaf4b (patch)
tree58c2219412e8af66fbfd66d5269af6b7a48b28d2 /java/src
parentAvoid check_output isn't supported with python 2.6 (diff)
downloadice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.bz2
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.tar.xz
ice-5a57686d2ccbb085b8ac67908ad9525a6bafaf4b.zip
Fixed ICE-6199 - changed collocation optimization to call AMI cb asynchronously if AMD dispatch
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/src/main/java/Ice/ConnectionI.java64
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java48
-rw-r--r--java/src/Ice/src/main/java/IceInternal/Incoming.java12
-rw-r--r--java/src/Ice/src/main/java/IceInternal/IncomingAsync.java12
-rw-r--r--java/src/Ice/src/main/java/IceInternal/IncomingBase.java24
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ResponseHandler.java6
6 files changed, 95 insertions, 71 deletions
diff --git a/java/src/Ice/src/main/java/Ice/ConnectionI.java b/java/src/Ice/src/main/java/Ice/ConnectionI.java
index 5ebcde93cfa..a6a973e72ed 100644
--- a/java/src/Ice/src/main/java/Ice/ConnectionI.java
+++ b/java/src/Ice/src/main/java/Ice/ConnectionI.java
@@ -856,7 +856,7 @@ public final class ConnectionI extends IceInternal.EventHandler
}
@Override
- synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
+ synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag, boolean amd)
{
assert (_state > StateNotValidated);
@@ -923,11 +923,38 @@ public final class ConnectionI extends IceInternal.EventHandler
}
@Override
- public boolean systemException(int requestId, Ice.SystemException ex)
+ public boolean systemException(int requestId, Ice.SystemException ex, boolean amd)
{
return false; // System exceptions aren't marshalled.
}
+ @Override
+ public synchronized void invokeException(int requestId, LocalException ex, int invokeNum, boolean amd)
+ {
+ //
+ // Fatal exception while invoking a request. Since
+ // sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert (_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert (_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+ }
+ }
+
public IceInternal.EndpointI endpoint()
{
return _endpoint; // No mutex protection necessary, _endpoint is
@@ -1609,33 +1636,6 @@ public final class ConnectionI extends IceInternal.EventHandler
setState(StateClosed, ex);
}
- @Override
- public synchronized void invokeException(int requestId, LocalException ex, int invokeNum)
- {
- //
- // Fatal exception while invoking a request. Since
- // sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- assert (_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert (_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
- }
-
public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
IceInternal.Transceiver transceiver, IceInternal.Connector connector,
IceInternal.EndpointI endpoint, ObjectAdapterI adapter)
@@ -2704,7 +2704,7 @@ public final class ConnectionI extends IceInternal.EventHandler
}
catch(LocalException ex)
{
- invokeException(requestId, ex, invokeNum);
+ invokeException(requestId, ex, invokeNum, false);
}
catch(java.lang.AssertionError ex) // Upon assertion, we print the stack
// trace.
@@ -2716,7 +2716,7 @@ public final class ConnectionI extends IceInternal.EventHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
catch(java.lang.OutOfMemoryError ex)
{
@@ -2727,7 +2727,7 @@ public final class ConnectionI extends IceInternal.EventHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
finally
{
diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
index 609cc1b40c6..34e12296bff 100644
--- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
@@ -217,7 +217,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public void
- sendResponse(int requestId, final BasicStream os, byte status)
+ sendResponse(int requestId, final BasicStream os, byte status, boolean amd)
{
OutgoingAsync outAsync = null;
synchronized(this)
@@ -241,7 +241,19 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(outAsync != null)
{
- outAsync.invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync.invokeCompleted();
+ }
}
_adapter.decDirectCount();
}
@@ -255,18 +267,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public boolean
- systemException(int requestId, Ice.SystemException ex)
+ systemException(int requestId, Ice.SystemException ex, boolean amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
return true;
}
@Override
public void
- invokeException(int requestId, Ice.LocalException ex, int invokeNum)
+ invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, amd);
_adapter.decDirectCount();
}
@@ -444,7 +456,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.ObjectAdapterDeactivatedException ex)
{
- handleException(requestId, ex);
+ handleException(requestId, ex, false);
return;
}
@@ -456,7 +468,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.LocalException ex)
{
- invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
{
@@ -467,7 +479,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
catch(java.lang.OutOfMemoryError ex)
{
@@ -478,12 +490,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
pw.flush();
uex.unknown = sw.toString();
_logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
+ invokeException(requestId, uex, invokeNum, false);
}
}
private void
- handleException(int requestId, Ice.Exception ex)
+ handleException(int requestId, Ice.Exception ex, boolean amd)
{
if(requestId == 0)
{
@@ -502,7 +514,19 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(outAsync != null)
{
- outAsync.invokeCompleted();
+ //
+ // If called from an AMD dispatch, invoke asynchronously
+ // the completion callback since this might be called from
+ // the user code.
+ //
+ if(amd)
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ else
+ {
+ outAsync.invokeCompleted();
+ }
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/Incoming.java b/java/src/Ice/src/main/java/IceInternal/Incoming.java
index cc9742cd295..70f1ae965ba 100644
--- a/java/src/Ice/src/main/java/IceInternal/Incoming.java
+++ b/java/src/Ice/src/main/java/IceInternal/Incoming.java
@@ -173,7 +173,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, false);
}
else
{
@@ -191,7 +191,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
catch(java.lang.Exception ex)
{
_is.skipEncaps(); // Required for batch requests.
- __handleException(ex);
+ __handleException(ex, false);
return;
}
}
@@ -229,7 +229,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
}
}
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(false))
{
return;
}
@@ -254,11 +254,11 @@ final public class Incoming extends IncomingBase implements Ice.Request
}
catch(java.lang.Exception ex)
{
- if(_servant != null && _locator != null && !__servantLocatorFinished())
+ if(_servant != null && _locator != null && !__servantLocatorFinished(false))
{
return;
}
- __handleException(ex);
+ __handleException(ex, false);
return;
}
@@ -276,7 +276,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, false);
}
else
{
diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java b/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
index 7783f6cac93..e288741c003 100644
--- a/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
+++ b/java/src/Ice/src/main/java/IceInternal/IncomingAsync.java
@@ -106,7 +106,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
try
{
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(true))
{
return;
}
@@ -119,7 +119,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, true);
}
else
{
@@ -135,7 +135,7 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
}
catch(Ice.LocalException ex)
{
- _responseHandler.invokeException(_current.requestId, ex, 1);
+ _responseHandler.invokeException(_current.requestId, ex, 1, true);
}
}
@@ -144,16 +144,16 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback
{
try
{
- if(_locator != null && !__servantLocatorFinished())
+ if(_locator != null && !__servantLocatorFinished(true))
{
return;
}
- __handleException(exc);
+ __handleException(exc, true);
}
catch(Ice.LocalException ex)
{
- _responseHandler.invokeException(_current.requestId, ex, 1);
+ _responseHandler.invokeException(_current.requestId, ex, 1, true);
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
index 6879e8cdf2f..1a2de3dd6ed 100644
--- a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
+++ b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
@@ -262,7 +262,7 @@ class IncomingBase
}
final protected boolean
- __servantLocatorFinished()
+ __servantLocatorFinished(boolean amd)
{
assert(_locator != null && _servant != null);
try
@@ -293,7 +293,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -309,13 +309,13 @@ class IncomingBase
}
catch(java.lang.Exception ex)
{
- __handleException(ex);
+ __handleException(ex, amd);
}
return false;
}
final protected void
- __handleException(java.lang.Exception exc)
+ __handleException(java.lang.Exception exc, boolean amd)
{
assert(_responseHandler != null);
@@ -390,7 +390,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -418,7 +418,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -446,7 +446,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -474,7 +474,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -485,7 +485,7 @@ class IncomingBase
{
if(ex instanceof Ice.SystemException)
{
- if(_responseHandler.systemException(_current.requestId, (Ice.SystemException)ex))
+ if(_responseHandler.systemException(_current.requestId, (Ice.SystemException)ex, amd))
{
return;
}
@@ -516,7 +516,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -550,7 +550,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
@@ -583,7 +583,7 @@ class IncomingBase
{
_observer.reply(_os.size() - Protocol.headerSize - 4);
}
- _responseHandler.sendResponse(_current.requestId, _os, _compress);
+ _responseHandler.sendResponse(_current.requestId, _os, _compress, amd);
}
else
{
diff --git a/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java b/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
index 89449a61945..52c98ffafd7 100644
--- a/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/ResponseHandler.java
@@ -11,8 +11,8 @@ package IceInternal;
public interface ResponseHandler
{
- void sendResponse(int requestId, BasicStream os, byte status);
+ void sendResponse(int requestId, BasicStream os, byte status, boolean amd);
void sendNoResponse();
- boolean systemException(int requestId, Ice.SystemException ex);
- void invokeException(int requestId, Ice.LocalException ex, int invokeNum);
+ boolean systemException(int requestId, Ice.SystemException ex, boolean amd);
+ void invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd);
}