diff options
author | Benoit Foucher <benoit@zeroc.com> | 2019-07-18 18:09:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-07-18 18:09:15 +0200 |
commit | 2c036ca6ca7f39b6987135839a1a899cd080877d (patch) | |
tree | 71b429a5784a77ec41084ba84105930f537f195e | |
parent | Bumped invocation timeout in Objectice-C timeout test (diff) | |
download | ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.bz2 ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.xz ice-2c036ca6ca7f39b6987135839a1a899cd080877d.zip |
Fixed non-thread safe AMD dispatch, fixes #448 (#449)
The caching of the output stream which was added back to solve #414 made the
dispatch of AMD requests non thread-safe if a dispatch interceptor was installed
and if the interceptor retried the dispatch. Multiple continuation could run
concurrently and eventually re-use the cached output stream to marshal multiple
responses.
-rw-r--r-- | cpp/test/Ice/interceptor/AMDInterceptorI.cpp | 9 | ||||
-rw-r--r-- | cpp/test/Ice/interceptor/Client.cpp | 10 | ||||
-rw-r--r-- | cpp/test/Ice/interceptor/MyObjectI.cpp | 36 | ||||
-rw-r--r-- | csharp/Makefile | 8 | ||||
-rw-r--r-- | csharp/src/Ice/Incoming.cs | 94 | ||||
-rw-r--r-- | csharp/src/Ice/Object.cs | 8 | ||||
-rw-r--r-- | csharp/test/Ice/interceptor/Client.cs | 12 | ||||
-rw-r--r-- | csharp/test/Ice/interceptor/InterceptorI.cs | 9 | ||||
-rw-r--r-- | java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java | 9 | ||||
-rw-r--r-- | java-compat/test/src/main/java/test/Ice/interceptor/Client.java | 10 | ||||
-rw-r--r-- | java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java | 13 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java | 2 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java | 6 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java | 69 | ||||
-rw-r--r-- | java/test/src/main/java/test/Ice/interceptor/Client.java | 10 | ||||
-rw-r--r-- | java/test/src/main/java/test/Ice/interceptor/InterceptorI.java | 9 |
16 files changed, 260 insertions, 54 deletions
diff --git a/cpp/test/Ice/interceptor/AMDInterceptorI.cpp b/cpp/test/Ice/interceptor/AMDInterceptorI.cpp index e2e2d95b001..158cf2736fa 100644 --- a/cpp/test/Ice/interceptor/AMDInterceptorI.cpp +++ b/cpp/test/Ice/interceptor/AMDInterceptorI.cpp @@ -103,6 +103,15 @@ AMDInterceptorI::dispatch(Ice::Request& request) current.ctx["retry"] = "no"; } + else if(current.ctx.find("retry") != current.ctx.end() && current.ctx["retry"] == "yes") + { + // + // Retry the dispatch to ensure that abandoning the result of the dispatch + // works fine and is thread-safe + // + _servant->ice_dispatch(request); + _servant->ice_dispatch(request); + } #ifdef ICE_CPP11_MAPPING _lastStatus = _servant->ice_dispatch(request, []() { return true; }, [this](exception_ptr ex) { diff --git a/cpp/test/Ice/interceptor/Client.cpp b/cpp/test/Ice/interceptor/Client.cpp index 191215fd59d..02317c19d2c 100644 --- a/cpp/test/Ice/interceptor/Client.cpp +++ b/cpp/test/Ice/interceptor/Client.cpp @@ -189,6 +189,16 @@ Client::runAmdTest(const Test::MyObjectPrxPtr& prx, const AMDInterceptorIPtr& in test(prx->amdAddWithRetry(33, 12) == 45); test(interceptor->getLastOperation() == "amdAddWithRetry"); test(!interceptor->getLastStatus()); + { + Ice::Context ctx; + ctx["retry"] = "yes"; + for(int i = 0; i < 10; ++i) + { + test(prx->amdAdd(33, 12, ctx) == 45); + test(interceptor->getLastOperation() == "amdAdd"); + test(!interceptor->getLastStatus()); + } + } cout << "ok" << endl; cout << "testing user exception... " << flush; try diff --git a/cpp/test/Ice/interceptor/MyObjectI.cpp b/cpp/test/Ice/interceptor/MyObjectI.cpp index 8604cd7007f..f3164ac4426 100644 --- a/cpp/test/Ice/interceptor/MyObjectI.cpp +++ b/cpp/test/Ice/interceptor/MyObjectI.cpp @@ -83,13 +83,22 @@ MyObjectI::amdAddAsync(int x, int y, function<void(int)> response, function<void(exception_ptr)>, - const Ice::Current&) + const Ice::Current& current) { + Ice::Context::const_iterator p = current.ctx.find("retry"); + bool retry = p != current.ctx.end(); std::thread t( - [x, y, response]() + [x, y, response, retry]() { this_thread::sleep_for(chrono::milliseconds(10)); - response(x + y); + try + { + response(x + y); + } + catch(const Ice::ResponseSentException&) + { + test(retry); + } }); t.detach(); } @@ -200,31 +209,42 @@ MyObjectI::amdBadSystemAddAsync(int, } #else void -MyObjectI::amdAdd_async(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, const Ice::Current&) +MyObjectI::amdAdd_async(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, const Ice::Current& current) { class ThreadI : public Thread { public: - ThreadI(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y) : + ThreadI(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, bool retry) : _cb(cb), _x(x), - _y(y) + _y(y), + _retry(retry) { } void run() { ThreadControl::sleep(Time::milliSeconds(10)); - _cb->ice_response(_x + _y); + try + { + _cb->ice_response(_x + _y); + } + catch(const Ice::ResponseSentException&) + { + test(_retry); + } } private: Test::AMD_MyObject_amdAddPtr _cb; int _x; int _y; + bool _retry; }; - ThreadPtr thread = new ThreadI(cb, x, y); + Ice::Context::const_iterator p = current.ctx.find("retry"); + bool retry = p != current.ctx.end(); + ThreadPtr thread = new ThreadI(cb, x, y, retry); thread->start().detach(); } diff --git a/csharp/Makefile b/csharp/Makefile index f0dc5088e0c..33c59414242 100644 --- a/csharp/Makefile +++ b/csharp/Makefile @@ -2,14 +2,16 @@ # Copyright (c) ZeroC, Inc. All rights reserved. # +DOTNETARGS = $(if $(filter $(OPTIMIZE),no),/p:Configuration=Debug) + all: - dotnet msbuild msbuild/ice.proj /m + dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /m tests: - dotnet msbuild msbuild/ice.proj /m + dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /m srcs: - dotnet msbuild msbuild/ice.proj /t:BuildDist /m + dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /t:BuildDist /m distclean clean: dotnet msbuild msbuild/ice.proj /t:Clean /m diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs index 621099d773b..c467f67c349 100644 --- a/csharp/src/Ice/Incoming.cs +++ b/csharp/src/Ice/Incoming.cs @@ -268,17 +268,27 @@ namespace IceInternal { if(task == null) { - _os = startWriteParams(); - write(_os, default(R)); - endWriteParams(_os); - return null; // Response is cached in the Incoming to not have to create unecessary Task + // + // Write default constructed response if no task is provided + // + var os = startWriteParams(); + write(os, default(R)); + endWriteParams(os); + return setResult(os); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<R> t) => { var result = t.GetAwaiter().GetResult(); - var os = startWriteParams(); + var os = startWriteParams(cached); write(os, result); endWriteParams(os); return Task.FromResult(os); @@ -290,15 +300,24 @@ namespace IceInternal { if(task == null) { - _os = writeEmptyParams(); - return null; + // + // Write response if no task is provided + // + return setResult(writeEmptyParams()); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task t) => { t.GetAwaiter().GetResult(); - return Task.FromResult(writeEmptyParams()); + return Task.FromResult(writeEmptyParams(cached)); }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } } @@ -307,11 +326,15 @@ namespace IceInternal { if(task == null) { - _os = default(T).getOutputStream(_current); - return null; // Response is cached in the Incoming to not have to create unecessary Task + return setResult(default(T).getOutputStream(_current)); } else { + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<T> t) => { return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current)); @@ -432,6 +455,20 @@ namespace IceInternal _format = format; } + public Ice.OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + var cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public Ice.OutputStream createResponseOutputStream(Ice.Current current) { var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding); @@ -441,17 +478,18 @@ namespace IceInternal return os; } - public Ice.OutputStream startWriteParams() + private Ice.OutputStream startWriteParams(Ice.OutputStream os) { if(!_response) { throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -459,6 +497,11 @@ namespace IceInternal return os; } + public Ice.OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(Ice.OutputStream os) { if(_response) @@ -467,14 +510,15 @@ namespace IceInternal } } - public Ice.OutputStream writeEmptyParams() + private Ice.OutputStream writeEmptyParams(Ice.OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -487,7 +531,12 @@ namespace IceInternal } } - public Ice.OutputStream writeParamEncaps(byte[] v, bool ok) + public Ice.OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok) { if(!ok && _observer != null) { @@ -496,10 +545,11 @@ namespace IceInternal if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); diff --git a/csharp/src/Ice/Object.cs b/csharp/src/Ice/Object.cs index dd9913ba5f3..78f776e671b 100644 --- a/csharp/src/Ice/Object.cs +++ b/csharp/src/Ice/Object.cs @@ -310,7 +310,7 @@ namespace Ice byte[] inEncaps = inS.readParamEncaps(); byte[] outEncaps; bool ok = ice_invoke(inEncaps, out outEncaps, current); - inS.setResult(inS.writeParamEncaps(outEncaps, ok)); + inS.setResult(inS.writeParamEncaps(inS.getAndClearCachedOutputStream(), outEncaps, ok)); return null; } } @@ -323,10 +323,12 @@ namespace Ice public override Task<Ice.OutputStream> iceDispatch(IceInternal.Incoming inS, Current current) { byte[] inEncaps = inS.readParamEncaps(); - return ice_invokeAsync(inEncaps, current).ContinueWith((Task<Object_Ice_invokeResult> t) => + var task = ice_invokeAsync(inEncaps, current); + var cached = inS.getAndClearCachedOutputStream(); + return task.ContinueWith((Task<Object_Ice_invokeResult> t) => { var ret = t.GetAwaiter().GetResult(); - return Task.FromResult(inS.writeParamEncaps(ret.outEncaps, ret.returnValue)); + return Task.FromResult(inS.writeParamEncaps(cached, ret.outEncaps, ret.returnValue)); }).Unwrap(); } } diff --git a/csharp/test/Ice/interceptor/Client.cs b/csharp/test/Ice/interceptor/Client.cs index 2345891a324..efdc4a6db83 100644 --- a/csharp/test/Ice/interceptor/Client.cs +++ b/csharp/test/Ice/interceptor/Client.cs @@ -114,6 +114,18 @@ namespace Ice test(prx.amdAddWithRetry(33, 12) == 45); test(interceptor.getLastOperation().Equals("amdAddWithRetry")); test(interceptor.getLastStatus()); + + { + var ctx = new Dictionary<string, string>(); + ctx.Add("retry", "yes"); + for(int i = 0; i < 10; ++i) + { + test(prx.amdAdd(33, 12, ctx) == 45); + test(interceptor.getLastOperation().Equals("amdAdd")); + test(interceptor.getLastStatus()); + } + } + output.WriteLine("ok"); output.Write("testing user exception... "); diff --git a/csharp/test/Ice/interceptor/InterceptorI.cs b/csharp/test/Ice/interceptor/InterceptorI.cs index 522d5059cb5..c3f88efd13a 100644 --- a/csharp/test/Ice/interceptor/InterceptorI.cs +++ b/csharp/test/Ice/interceptor/InterceptorI.cs @@ -75,6 +75,15 @@ namespace Ice current.ctx["retry"] = "no"; } + else if(current.ctx.TryGetValue("retry", out context) && context.Equals("yes")) + { + // + // Retry the dispatch to ensure that abandoning the result of the dispatch + // works fine and is thread-safe + // + servant_.ice_dispatch(request); + servant_.ice_dispatch(request); + } var task = servant_.ice_dispatch(request); lastStatus_ = task != null; diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java b/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java index a6a855b0595..db208419d44 100644 --- a/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java +++ b/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java @@ -70,6 +70,15 @@ class AMDInterceptorI extends InterceptorI implements Ice.DispatchInterceptorAsy request.getCurrent().ctx.put("retry", "no"); } + else if(current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes")) + { + // + // Retry the dispatch to ensure that abandoning the result of the dispatch + // works fine and is thread-safe + // + _servant.ice_dispatch(request); + _servant.ice_dispatch(request); + } _lastStatus = _servant.ice_dispatch(request, this); diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/Client.java b/java-compat/test/src/main/java/test/Ice/interceptor/Client.java index 0661f9e2117..2c95ccfdc21 100644 --- a/java-compat/test/src/main/java/test/Ice/interceptor/Client.java +++ b/java-compat/test/src/main/java/test/Ice/interceptor/Client.java @@ -119,6 +119,16 @@ public class Client extends test.TestHelper test(prx.amdAddWithRetry(33, 12) == 45); test(interceptor.getLastOperation().equals("amdAddWithRetry")); test(!interceptor.getLastStatus()); + { + java.util.Map<String, String> ctx = new java.util.HashMap<>(); + ctx.put("retry", "yes"); + for(int i = 0; i < 10; ++i) + { + test(prx.amdAdd(33, 12, ctx) == 45); + test(interceptor.getLastOperation().equals("amdAdd")); + test(!interceptor.getLastStatus()); + } + } out.println("ok"); out.print("testing user exception... "); out.flush(); diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java b/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java index 1f60cb0adb8..cebd1882ccd 100644 --- a/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java +++ b/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java @@ -65,6 +65,7 @@ class MyObjectI extends _MyObjectDisp public void amdAdd_async(final AMD_MyObject_amdAdd cb, final int x, final int y, Ice.Current current) { + final boolean retry = current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes"); Thread thread = new Thread() { @Override @@ -78,7 +79,17 @@ class MyObjectI extends _MyObjectDisp catch(InterruptedException e) { } - cb.ice_response(x + y); + try + { + cb.ice_response(x + y); + } + catch(Ice.ResponseSentException ex) + { + if(!retry) + { + throw ex; + } + } } }; diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java index e23f5e7e6a5..9d3637badaf 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java @@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current); - return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue)); + return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue)); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java index cb9ddadfd16..6a148071905 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java @@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object { byte[] inEncaps = in.readParamEncaps(); CompletableFuture<OutputStream> f = new CompletableFuture<>(); - ice_invokeAsync(inEncaps, current).whenComplete((result, ex) -> + CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current); + final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + s.whenComplete((result, ex) -> { if(ex != null) { @@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object } else { - f.complete(in.writeParamEncaps(result.outParams, result.returnValue)); + f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue)); } }); return f; diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java index dc5a60d390a..f8d899ae365 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java @@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - OutputStream os = startWriteParams(); + OutputStream os = startWriteParams(cached); write.write(os, result); endWriteParams(os); r.complete(os); @@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f) { + final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>(); f.whenComplete((result, ex) -> { @@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request } else { - r.complete(writeEmptyParams()); + r.complete(writeEmptyParams(cached)); } }); return r; @@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request _format = format; } + public OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + OutputStream cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public OutputStream createResponseOutputStream(Current current) { OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding); @@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } - public OutputStream startWriteParams() + private OutputStream startWriteParams(OutputStream os) { if(!_response) { throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request return os; } + public OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(OutputStream os) { if(_response) @@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeEmptyParams() + private OutputStream writeEmptyParams(OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request } } - public OutputStream writeParamEncaps(byte[] v, boolean ok) + public OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok) { if(!ok && _observer != null) { @@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request if(_response) { - // If there's an output stream set, re-use it for the response - OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new OutputStream(_instance, Protocol.currentProtocolEncoding); + } assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); diff --git a/java/test/src/main/java/test/Ice/interceptor/Client.java b/java/test/src/main/java/test/Ice/interceptor/Client.java index dafae39f792..bbb62f152b9 100644 --- a/java/test/src/main/java/test/Ice/interceptor/Client.java +++ b/java/test/src/main/java/test/Ice/interceptor/Client.java @@ -111,6 +111,16 @@ public class Client extends test.TestHelper test(prx.amdAddWithRetry(33, 12) == 45); test(interceptor.getLastOperation().equals("amdAddWithRetry")); test(interceptor.getLastStatus()); + { + java.util.Map<String, String> ctx = new java.util.HashMap<>(); + ctx.put("retry", "yes"); + for(int i = 0; i < 10; ++i) + { + test(prx.amdAdd(33, 12, ctx) == 45); + test(interceptor.getLastOperation().equals("amdAdd")); + test(interceptor.getLastStatus()); + } + } out.println("ok"); out.print("testing user exception... "); out.flush(); diff --git a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java index 5ea248e1c43..afd5597f6b0 100644 --- a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java +++ b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java @@ -70,6 +70,15 @@ class InterceptorI extends com.zeroc.Ice.DispatchInterceptor current.ctx.put("retry", "no"); } + else if(current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes")) + { + // + // Retry the dispatch to ensure that abandoning the result of the dispatch + // works fine and is thread-safe + // + _servant.ice_dispatch(request); + _servant.ice_dispatch(request); + } CompletionStage<OutputStream> f = _servant.ice_dispatch(request); _lastStatus = f != null; |