summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2019-07-18 23:51:08 +0200
committerJose <jose@zeroc.com>2019-07-18 23:51:08 +0200
commitfc886b010c01cccb8cca3ac4d92f1ebd7fc72295 (patch)
tree51cf00a4a955efecc9c94527aeafcb25ffbe57b9 /java/src
parentSimplify OutputStream creation (diff)
parentFixed non-thread safe AMD dispatch, fixes #448 (#449) (diff)
downloadice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.tar.bz2
ice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.tar.xz
ice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.zip
Merge remote-tracking branch 'origin/3.7' into swift
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java2
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java6
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java18
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java69
4 files changed, 77 insertions, 18 deletions
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
index e23f5e7e6a5..9d3637badaf 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
@@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current);
- return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue));
+ return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue));
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
index cb9ddadfd16..6a148071905 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
@@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
CompletableFuture<OutputStream> f = new CompletableFuture<>();
- ice_invokeAsync(inEncaps, current).whenComplete((result, ex) ->
+ CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current);
+ final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+ s.whenComplete((result, ex) ->
{
if(ex != null)
{
@@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
}
else
{
- f.complete(in.writeParamEncaps(result.outParams, result.returnValue));
+ f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue));
}
});
return f;
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java b/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java
index d825707630e..8523bdc929d 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java
@@ -36,6 +36,22 @@ public abstract class DispatchInterceptor implements com.zeroc.Ice.Object
public CompletionStage<OutputStream> _iceDispatch(com.zeroc.IceInternal.Incoming in, Current current)
throws UserException
{
- return dispatch(in);
+ try
+ {
+ return dispatch(in);
+ }
+ catch(java.lang.Throwable ex)
+ {
+ //
+ // If the input parameters weren't read, make sure we skip them here. It's needed to read the
+ // encoding version used by the client to eventually marshal the user exception. It's also needed
+ // if we dispatch a batch oneway request to read the next batch request.
+ //
+ if(current.encoding == null || (current.encoding.major == 0 && current.encoding.minor == 0))
+ {
+ in.skipReadParams();
+ }
+ throw ex;
+ }
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
index dc5a60d390a..f8d899ae365 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
@@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- OutputStream os = startWriteParams();
+ OutputStream os = startWriteParams(cached);
write.write(os, result);
endWriteParams(os);
r.complete(os);
@@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- r.complete(writeEmptyParams());
+ r.complete(writeEmptyParams(cached));
}
});
return r;
@@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request
_format = format;
}
+ public OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ OutputStream cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public OutputStream createResponseOutputStream(Current current)
{
OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding);
@@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
- public OutputStream startWriteParams()
+ private OutputStream startWriteParams(OutputStream os)
{
if(!_response)
{
throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
+ public OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(OutputStream os)
{
if(_response)
@@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeEmptyParams()
+ private OutputStream writeEmptyParams(OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeParamEncaps(byte[] v, boolean ok)
+ public OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok)
{
if(!ok && _observer != null)
{
@@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);