summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2019-07-18 18:09:15 +0200
committerGitHub <noreply@github.com>2019-07-18 18:09:15 +0200
commit2c036ca6ca7f39b6987135839a1a899cd080877d (patch)
tree71b429a5784a77ec41084ba84105930f537f195e
parentBumped invocation timeout in Objectice-C timeout test (diff)
downloadice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.bz2
ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.xz
ice-2c036ca6ca7f39b6987135839a1a899cd080877d.zip
Fixed non-thread safe AMD dispatch, fixes #448 (#449)
The caching of the output stream which was added back to solve #414 made the dispatch of AMD requests non thread-safe if a dispatch interceptor was installed and if the interceptor retried the dispatch. Multiple continuation could run concurrently and eventually re-use the cached output stream to marshal multiple responses.
-rw-r--r--cpp/test/Ice/interceptor/AMDInterceptorI.cpp9
-rw-r--r--cpp/test/Ice/interceptor/Client.cpp10
-rw-r--r--cpp/test/Ice/interceptor/MyObjectI.cpp36
-rw-r--r--csharp/Makefile8
-rw-r--r--csharp/src/Ice/Incoming.cs94
-rw-r--r--csharp/src/Ice/Object.cs8
-rw-r--r--csharp/test/Ice/interceptor/Client.cs12
-rw-r--r--csharp/test/Ice/interceptor/InterceptorI.cs9
-rw-r--r--java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java9
-rw-r--r--java-compat/test/src/main/java/test/Ice/interceptor/Client.java10
-rw-r--r--java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java13
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java2
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java6
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java69
-rw-r--r--java/test/src/main/java/test/Ice/interceptor/Client.java10
-rw-r--r--java/test/src/main/java/test/Ice/interceptor/InterceptorI.java9
16 files changed, 260 insertions, 54 deletions
diff --git a/cpp/test/Ice/interceptor/AMDInterceptorI.cpp b/cpp/test/Ice/interceptor/AMDInterceptorI.cpp
index e2e2d95b001..158cf2736fa 100644
--- a/cpp/test/Ice/interceptor/AMDInterceptorI.cpp
+++ b/cpp/test/Ice/interceptor/AMDInterceptorI.cpp
@@ -103,6 +103,15 @@ AMDInterceptorI::dispatch(Ice::Request& request)
current.ctx["retry"] = "no";
}
+ else if(current.ctx.find("retry") != current.ctx.end() && current.ctx["retry"] == "yes")
+ {
+ //
+ // Retry the dispatch to ensure that abandoning the result of the dispatch
+ // works fine and is thread-safe
+ //
+ _servant->ice_dispatch(request);
+ _servant->ice_dispatch(request);
+ }
#ifdef ICE_CPP11_MAPPING
_lastStatus = _servant->ice_dispatch(request, []() { return true; }, [this](exception_ptr ex) {
diff --git a/cpp/test/Ice/interceptor/Client.cpp b/cpp/test/Ice/interceptor/Client.cpp
index 191215fd59d..02317c19d2c 100644
--- a/cpp/test/Ice/interceptor/Client.cpp
+++ b/cpp/test/Ice/interceptor/Client.cpp
@@ -189,6 +189,16 @@ Client::runAmdTest(const Test::MyObjectPrxPtr& prx, const AMDInterceptorIPtr& in
test(prx->amdAddWithRetry(33, 12) == 45);
test(interceptor->getLastOperation() == "amdAddWithRetry");
test(!interceptor->getLastStatus());
+ {
+ Ice::Context ctx;
+ ctx["retry"] = "yes";
+ for(int i = 0; i < 10; ++i)
+ {
+ test(prx->amdAdd(33, 12, ctx) == 45);
+ test(interceptor->getLastOperation() == "amdAdd");
+ test(!interceptor->getLastStatus());
+ }
+ }
cout << "ok" << endl;
cout << "testing user exception... " << flush;
try
diff --git a/cpp/test/Ice/interceptor/MyObjectI.cpp b/cpp/test/Ice/interceptor/MyObjectI.cpp
index 8604cd7007f..f3164ac4426 100644
--- a/cpp/test/Ice/interceptor/MyObjectI.cpp
+++ b/cpp/test/Ice/interceptor/MyObjectI.cpp
@@ -83,13 +83,22 @@ MyObjectI::amdAddAsync(int x,
int y,
function<void(int)> response,
function<void(exception_ptr)>,
- const Ice::Current&)
+ const Ice::Current& current)
{
+ Ice::Context::const_iterator p = current.ctx.find("retry");
+ bool retry = p != current.ctx.end();
std::thread t(
- [x, y, response]()
+ [x, y, response, retry]()
{
this_thread::sleep_for(chrono::milliseconds(10));
- response(x + y);
+ try
+ {
+ response(x + y);
+ }
+ catch(const Ice::ResponseSentException&)
+ {
+ test(retry);
+ }
});
t.detach();
}
@@ -200,31 +209,42 @@ MyObjectI::amdBadSystemAddAsync(int,
}
#else
void
-MyObjectI::amdAdd_async(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, const Ice::Current&)
+MyObjectI::amdAdd_async(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, const Ice::Current& current)
{
class ThreadI : public Thread
{
public:
- ThreadI(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y) :
+ ThreadI(const Test::AMD_MyObject_amdAddPtr& cb, int x, int y, bool retry) :
_cb(cb),
_x(x),
- _y(y)
+ _y(y),
+ _retry(retry)
{
}
void run()
{
ThreadControl::sleep(Time::milliSeconds(10));
- _cb->ice_response(_x + _y);
+ try
+ {
+ _cb->ice_response(_x + _y);
+ }
+ catch(const Ice::ResponseSentException&)
+ {
+ test(_retry);
+ }
}
private:
Test::AMD_MyObject_amdAddPtr _cb;
int _x;
int _y;
+ bool _retry;
};
- ThreadPtr thread = new ThreadI(cb, x, y);
+ Ice::Context::const_iterator p = current.ctx.find("retry");
+ bool retry = p != current.ctx.end();
+ ThreadPtr thread = new ThreadI(cb, x, y, retry);
thread->start().detach();
}
diff --git a/csharp/Makefile b/csharp/Makefile
index f0dc5088e0c..33c59414242 100644
--- a/csharp/Makefile
+++ b/csharp/Makefile
@@ -2,14 +2,16 @@
# Copyright (c) ZeroC, Inc. All rights reserved.
#
+DOTNETARGS = $(if $(filter $(OPTIMIZE),no),/p:Configuration=Debug)
+
all:
- dotnet msbuild msbuild/ice.proj /m
+ dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /m
tests:
- dotnet msbuild msbuild/ice.proj /m
+ dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /m
srcs:
- dotnet msbuild msbuild/ice.proj /t:BuildDist /m
+ dotnet msbuild msbuild/ice.proj $(DOTNETARGS) /t:BuildDist /m
distclean clean:
dotnet msbuild msbuild/ice.proj /t:Clean /m
diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs
index 621099d773b..c467f67c349 100644
--- a/csharp/src/Ice/Incoming.cs
+++ b/csharp/src/Ice/Incoming.cs
@@ -268,17 +268,27 @@ namespace IceInternal
{
if(task == null)
{
- _os = startWriteParams();
- write(_os, default(R));
- endWriteParams(_os);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ //
+ // Write default constructed response if no task is provided
+ //
+ var os = startWriteParams();
+ write(os, default(R));
+ endWriteParams(os);
+ return setResult(os);
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<R> t) =>
{
var result = t.GetAwaiter().GetResult();
- var os = startWriteParams();
+ var os = startWriteParams(cached);
write(os, result);
endWriteParams(os);
return Task.FromResult(os);
@@ -290,15 +300,24 @@ namespace IceInternal
{
if(task == null)
{
- _os = writeEmptyParams();
- return null;
+ //
+ // Write response if no task is provided
+ //
+ return setResult(writeEmptyParams());
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task t) =>
{
t.GetAwaiter().GetResult();
- return Task.FromResult(writeEmptyParams());
+ return Task.FromResult(writeEmptyParams(cached));
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
}
@@ -307,11 +326,15 @@ namespace IceInternal
{
if(task == null)
{
- _os = default(T).getOutputStream(_current);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ return setResult(default(T).getOutputStream(_current));
}
else
{
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<T> t) =>
{
return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current));
@@ -432,6 +455,20 @@ namespace IceInternal
_format = format;
}
+ public Ice.OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ var cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public Ice.OutputStream createResponseOutputStream(Ice.Current current)
{
var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding);
@@ -441,17 +478,18 @@ namespace IceInternal
return os;
}
- public Ice.OutputStream startWriteParams()
+ private Ice.OutputStream startWriteParams(Ice.OutputStream os)
{
if(!_response)
{
throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -459,6 +497,11 @@ namespace IceInternal
return os;
}
+ public Ice.OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(Ice.OutputStream os)
{
if(_response)
@@ -467,14 +510,15 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeEmptyParams()
+ private Ice.OutputStream writeEmptyParams(Ice.OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -487,7 +531,12 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeParamEncaps(byte[] v, bool ok)
+ public Ice.OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok)
{
if(!ok && _observer != null)
{
@@ -496,10 +545,11 @@ namespace IceInternal
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
diff --git a/csharp/src/Ice/Object.cs b/csharp/src/Ice/Object.cs
index dd9913ba5f3..78f776e671b 100644
--- a/csharp/src/Ice/Object.cs
+++ b/csharp/src/Ice/Object.cs
@@ -310,7 +310,7 @@ namespace Ice
byte[] inEncaps = inS.readParamEncaps();
byte[] outEncaps;
bool ok = ice_invoke(inEncaps, out outEncaps, current);
- inS.setResult(inS.writeParamEncaps(outEncaps, ok));
+ inS.setResult(inS.writeParamEncaps(inS.getAndClearCachedOutputStream(), outEncaps, ok));
return null;
}
}
@@ -323,10 +323,12 @@ namespace Ice
public override Task<Ice.OutputStream> iceDispatch(IceInternal.Incoming inS, Current current)
{
byte[] inEncaps = inS.readParamEncaps();
- return ice_invokeAsync(inEncaps, current).ContinueWith((Task<Object_Ice_invokeResult> t) =>
+ var task = ice_invokeAsync(inEncaps, current);
+ var cached = inS.getAndClearCachedOutputStream();
+ return task.ContinueWith((Task<Object_Ice_invokeResult> t) =>
{
var ret = t.GetAwaiter().GetResult();
- return Task.FromResult(inS.writeParamEncaps(ret.outEncaps, ret.returnValue));
+ return Task.FromResult(inS.writeParamEncaps(cached, ret.outEncaps, ret.returnValue));
}).Unwrap();
}
}
diff --git a/csharp/test/Ice/interceptor/Client.cs b/csharp/test/Ice/interceptor/Client.cs
index 2345891a324..efdc4a6db83 100644
--- a/csharp/test/Ice/interceptor/Client.cs
+++ b/csharp/test/Ice/interceptor/Client.cs
@@ -114,6 +114,18 @@ namespace Ice
test(prx.amdAddWithRetry(33, 12) == 45);
test(interceptor.getLastOperation().Equals("amdAddWithRetry"));
test(interceptor.getLastStatus());
+
+ {
+ var ctx = new Dictionary<string, string>();
+ ctx.Add("retry", "yes");
+ for(int i = 0; i < 10; ++i)
+ {
+ test(prx.amdAdd(33, 12, ctx) == 45);
+ test(interceptor.getLastOperation().Equals("amdAdd"));
+ test(interceptor.getLastStatus());
+ }
+ }
+
output.WriteLine("ok");
output.Write("testing user exception... ");
diff --git a/csharp/test/Ice/interceptor/InterceptorI.cs b/csharp/test/Ice/interceptor/InterceptorI.cs
index 522d5059cb5..c3f88efd13a 100644
--- a/csharp/test/Ice/interceptor/InterceptorI.cs
+++ b/csharp/test/Ice/interceptor/InterceptorI.cs
@@ -75,6 +75,15 @@ namespace Ice
current.ctx["retry"] = "no";
}
+ else if(current.ctx.TryGetValue("retry", out context) && context.Equals("yes"))
+ {
+ //
+ // Retry the dispatch to ensure that abandoning the result of the dispatch
+ // works fine and is thread-safe
+ //
+ servant_.ice_dispatch(request);
+ servant_.ice_dispatch(request);
+ }
var task = servant_.ice_dispatch(request);
lastStatus_ = task != null;
diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java b/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java
index a6a855b0595..db208419d44 100644
--- a/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java
+++ b/java-compat/test/src/main/java/test/Ice/interceptor/AMDInterceptorI.java
@@ -70,6 +70,15 @@ class AMDInterceptorI extends InterceptorI implements Ice.DispatchInterceptorAsy
request.getCurrent().ctx.put("retry", "no");
}
+ else if(current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes"))
+ {
+ //
+ // Retry the dispatch to ensure that abandoning the result of the dispatch
+ // works fine and is thread-safe
+ //
+ _servant.ice_dispatch(request);
+ _servant.ice_dispatch(request);
+ }
_lastStatus = _servant.ice_dispatch(request, this);
diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/Client.java b/java-compat/test/src/main/java/test/Ice/interceptor/Client.java
index 0661f9e2117..2c95ccfdc21 100644
--- a/java-compat/test/src/main/java/test/Ice/interceptor/Client.java
+++ b/java-compat/test/src/main/java/test/Ice/interceptor/Client.java
@@ -119,6 +119,16 @@ public class Client extends test.TestHelper
test(prx.amdAddWithRetry(33, 12) == 45);
test(interceptor.getLastOperation().equals("amdAddWithRetry"));
test(!interceptor.getLastStatus());
+ {
+ java.util.Map<String, String> ctx = new java.util.HashMap<>();
+ ctx.put("retry", "yes");
+ for(int i = 0; i < 10; ++i)
+ {
+ test(prx.amdAdd(33, 12, ctx) == 45);
+ test(interceptor.getLastOperation().equals("amdAdd"));
+ test(!interceptor.getLastStatus());
+ }
+ }
out.println("ok");
out.print("testing user exception... ");
out.flush();
diff --git a/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java b/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java
index 1f60cb0adb8..cebd1882ccd 100644
--- a/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java
+++ b/java-compat/test/src/main/java/test/Ice/interceptor/MyObjectI.java
@@ -65,6 +65,7 @@ class MyObjectI extends _MyObjectDisp
public void
amdAdd_async(final AMD_MyObject_amdAdd cb, final int x, final int y, Ice.Current current)
{
+ final boolean retry = current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes");
Thread thread = new Thread()
{
@Override
@@ -78,7 +79,17 @@ class MyObjectI extends _MyObjectDisp
catch(InterruptedException e)
{
}
- cb.ice_response(x + y);
+ try
+ {
+ cb.ice_response(x + y);
+ }
+ catch(Ice.ResponseSentException ex)
+ {
+ if(!retry)
+ {
+ throw ex;
+ }
+ }
}
};
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
index e23f5e7e6a5..9d3637badaf 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
@@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current);
- return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue));
+ return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue));
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
index cb9ddadfd16..6a148071905 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
@@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
CompletableFuture<OutputStream> f = new CompletableFuture<>();
- ice_invokeAsync(inEncaps, current).whenComplete((result, ex) ->
+ CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current);
+ final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+ s.whenComplete((result, ex) ->
{
if(ex != null)
{
@@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
}
else
{
- f.complete(in.writeParamEncaps(result.outParams, result.returnValue));
+ f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue));
}
});
return f;
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
index dc5a60d390a..f8d899ae365 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
@@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- OutputStream os = startWriteParams();
+ OutputStream os = startWriteParams(cached);
write.write(os, result);
endWriteParams(os);
r.complete(os);
@@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- r.complete(writeEmptyParams());
+ r.complete(writeEmptyParams(cached));
}
});
return r;
@@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request
_format = format;
}
+ public OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ OutputStream cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public OutputStream createResponseOutputStream(Current current)
{
OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding);
@@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
- public OutputStream startWriteParams()
+ private OutputStream startWriteParams(OutputStream os)
{
if(!_response)
{
throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
+ public OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(OutputStream os)
{
if(_response)
@@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeEmptyParams()
+ private OutputStream writeEmptyParams(OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeParamEncaps(byte[] v, boolean ok)
+ public OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok)
{
if(!ok && _observer != null)
{
@@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
diff --git a/java/test/src/main/java/test/Ice/interceptor/Client.java b/java/test/src/main/java/test/Ice/interceptor/Client.java
index dafae39f792..bbb62f152b9 100644
--- a/java/test/src/main/java/test/Ice/interceptor/Client.java
+++ b/java/test/src/main/java/test/Ice/interceptor/Client.java
@@ -111,6 +111,16 @@ public class Client extends test.TestHelper
test(prx.amdAddWithRetry(33, 12) == 45);
test(interceptor.getLastOperation().equals("amdAddWithRetry"));
test(interceptor.getLastStatus());
+ {
+ java.util.Map<String, String> ctx = new java.util.HashMap<>();
+ ctx.put("retry", "yes");
+ for(int i = 0; i < 10; ++i)
+ {
+ test(prx.amdAdd(33, 12, ctx) == 45);
+ test(interceptor.getLastOperation().equals("amdAdd"));
+ test(interceptor.getLastStatus());
+ }
+ }
out.println("ok");
out.print("testing user exception... ");
out.flush();
diff --git a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
index 5ea248e1c43..afd5597f6b0 100644
--- a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
+++ b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
@@ -70,6 +70,15 @@ class InterceptorI extends com.zeroc.Ice.DispatchInterceptor
current.ctx.put("retry", "no");
}
+ else if(current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes"))
+ {
+ //
+ // Retry the dispatch to ensure that abandoning the result of the dispatch
+ // works fine and is thread-safe
+ //
+ _servant.ice_dispatch(request);
+ _servant.ice_dispatch(request);
+ }
CompletionStage<OutputStream> f = _servant.ice_dispatch(request);
_lastStatus = f != null;