diff options
Diffstat (limited to 'java/src')
4 files changed, 77 insertions, 18 deletions
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java index e23f5e7e6a5..9d3637badaf 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java @@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current); - return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue)); + return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue)); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java index cb9ddadfd16..6a148071905 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java @@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); CompletableFuture<OutputStream> f = new CompletableFuture<>(); - ice_invokeAsync(inEncaps, current).whenComplete((result, ex) -> + CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current); + final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + s.whenComplete((result, ex) -> { if(ex != null) { @@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object } else { - f.complete(in.writeParamEncaps(result.outParams, result.returnValue)); + f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue)); } }); return f; diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java b/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java index d825707630e..8523bdc929d 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/DispatchInterceptor.java @@ -36,6 +36,22 @@ public abstract class DispatchInterceptor implements com.zeroc.Ice.Object public CompletionStage<OutputStream> _iceDispatch(com.zeroc.IceInternal.Incoming in, Current current) throws UserException { - return dispatch(in); + try + { + return dispatch(in); + } + catch(java.lang.Throwable ex) + { + // + // If the input parameters weren't read, make sure we skip them here. It's needed to read the + // encoding version used by the client to eventually marshal the user exception. It's also needed + // if we dispatch a batch oneway request to read the next batch request. + // + if(current.encoding == null || (current.encoding.major == 0 && current.encoding.minor == 0)) + { + in.skipReadParams(); + } + throw ex; + } } } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java index dc5a60d390a..f8d899ae365 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java @@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - OutputStream os = startWriteParams(); + OutputStream os = startWriteParams(cached); write.write(os, result); endWriteParams(os); r.complete(os); @@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - r.complete(writeEmptyParams()); + r.complete(writeEmptyParams(cached)); } }); return r; @@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request _format = format; } + public OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + OutputStream cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public OutputStream createResponseOutputStream(Current current) { OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding); @@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } - public OutputStream startWriteParams() + private OutputStream startWriteParams(OutputStream os) { if(!_response) { throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } + public OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(OutputStream os) { if(_response) @@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeEmptyParams() + private OutputStream writeEmptyParams(OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeParamEncaps(byte[] v, boolean ok) + public OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok) { if(!ok && _observer != null) { @@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); |