diff options
author | Benoit Foucher <benoit@zeroc.com> | 2019-07-18 18:09:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-07-18 18:09:15 +0200 |
commit | 2c036ca6ca7f39b6987135839a1a899cd080877d (patch) | |
tree | 71b429a5784a77ec41084ba84105930f537f195e /java/src | |
parent | Bumped invocation timeout in Objectice-C timeout test (diff) | |
download | ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.bz2 ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.xz ice-2c036ca6ca7f39b6987135839a1a899cd080877d.zip |
Fixed non-thread safe AMD dispatch, fixes #448 (#449)
The caching of the output stream which was added back to solve #414 made the
dispatch of AMD requests non thread-safe if a dispatch interceptor was installed
and if the interceptor retried the dispatch. Multiple continuation could run
concurrently and eventually re-use the cached output stream to marshal multiple
responses.
Diffstat (limited to 'java/src')
3 files changed, 60 insertions, 17 deletions
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java index e23f5e7e6a5..9d3637badaf 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java @@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current); - return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue)); + return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue)); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java index cb9ddadfd16..6a148071905 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java @@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); CompletableFuture<OutputStream> f = new CompletableFuture<>(); - ice_invokeAsync(inEncaps, current).whenComplete((result, ex) -> + CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current); + final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + s.whenComplete((result, ex) -> { if(ex != null) { @@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object } else { - f.complete(in.writeParamEncaps(result.outParams, result.returnValue)); + f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue)); } }); return f; diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java index dc5a60d390a..f8d899ae365 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java @@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - OutputStream os = startWriteParams(); + OutputStream os = startWriteParams(cached); write.write(os, result); endWriteParams(os); r.complete(os); @@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - r.complete(writeEmptyParams()); + r.complete(writeEmptyParams(cached)); } }); return r; @@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request _format = format; } + public OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + OutputStream cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public OutputStream createResponseOutputStream(Current current) { OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding); @@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } - public OutputStream startWriteParams() + private OutputStream startWriteParams(OutputStream os) { if(!_response) { throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } + public OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(OutputStream os) { if(_response) @@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeEmptyParams() + private OutputStream writeEmptyParams(OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeParamEncaps(byte[] v, boolean ok) + public OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok) { if(!ok && _observer != null) { @@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); |