summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java2
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java6
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java69
-rw-r--r--java/test/src/main/java/test/Ice/interceptor/Client.java10
-rw-r--r--java/test/src/main/java/test/Ice/interceptor/InterceptorI.java9
5 files changed, 79 insertions, 17 deletions
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
index e23f5e7e6a5..9d3637badaf 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/Blobject.java
@@ -39,6 +39,6 @@ public interface Blobject extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
com.zeroc.Ice.Object.Ice_invokeResult r = ice_invoke(inEncaps, current);
- return in.setResult(in.writeParamEncaps(r.outParams, r.returnValue));
+ return in.setResult(in.writeParamEncaps(in.getAndClearCachedOutputStream(), r.outParams, r.returnValue));
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
index cb9ddadfd16..6a148071905 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/BlobjectAsync.java
@@ -45,7 +45,9 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
{
byte[] inEncaps = in.readParamEncaps();
CompletableFuture<OutputStream> f = new CompletableFuture<>();
- ice_invokeAsync(inEncaps, current).whenComplete((result, ex) ->
+ CompletionStage<Object.Ice_invokeResult> s = ice_invokeAsync(inEncaps, current);
+ final OutputStream cached = in.getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+ s.whenComplete((result, ex) ->
{
if(ex != null)
{
@@ -53,7 +55,7 @@ public interface BlobjectAsync extends com.zeroc.Ice.Object
}
else
{
- f.complete(in.writeParamEncaps(result.outParams, result.returnValue));
+ f.complete(in.writeParamEncaps(cached, result.outParams, result.returnValue));
}
});
return f;
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
index dc5a60d390a..f8d899ae365 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Incoming.java
@@ -271,6 +271,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public <T> CompletionStage<OutputStream> setResultFuture(CompletionStage<T> f, Write<T> write)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -280,7 +287,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- OutputStream os = startWriteParams();
+ OutputStream os = startWriteParams(cached);
write.write(os, result);
endWriteParams(os);
r.complete(os);
@@ -291,6 +298,13 @@ final public class Incoming implements com.zeroc.Ice.Request
public CompletionStage<OutputStream> setResultFuture(CompletionStage<Void> f)
{
+ final OutputStream cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
final CompletableFuture<OutputStream> r = new CompletableFuture<OutputStream>();
f.whenComplete((result, ex) ->
{
@@ -300,7 +314,7 @@ final public class Incoming implements com.zeroc.Ice.Request
}
else
{
- r.complete(writeEmptyParams());
+ r.complete(writeEmptyParams(cached));
}
});
return r;
@@ -448,6 +462,20 @@ final public class Incoming implements com.zeroc.Ice.Request
_format = format;
}
+ public OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ OutputStream cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public OutputStream createResponseOutputStream(Current current)
{
OutputStream os = new OutputStream(current.adapter.getCommunicator(), Protocol.currentProtocolEncoding);
@@ -457,17 +485,18 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
- public OutputStream startWriteParams()
+ private OutputStream startWriteParams(OutputStream os)
{
if(!_response)
{
throw new com.zeroc.Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -475,6 +504,11 @@ final public class Incoming implements com.zeroc.Ice.Request
return os;
}
+ public OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(OutputStream os)
{
if(_response)
@@ -483,14 +517,15 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeEmptyParams()
+ private OutputStream writeEmptyParams(OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -503,7 +538,12 @@ final public class Incoming implements com.zeroc.Ice.Request
}
}
- public OutputStream writeParamEncaps(byte[] v, boolean ok)
+ public OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public OutputStream writeParamEncaps(OutputStream os, byte[] v, boolean ok)
{
if(!ok && _observer != null)
{
@@ -512,10 +552,11 @@ final public class Incoming implements com.zeroc.Ice.Request
if(_response)
{
- // If there's an output stream set, re-use it for the response
- OutputStream os = _os == null ? new OutputStream(_instance, Protocol.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new OutputStream(_instance, Protocol.currentProtocolEncoding);
+ }
assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
diff --git a/java/test/src/main/java/test/Ice/interceptor/Client.java b/java/test/src/main/java/test/Ice/interceptor/Client.java
index dafae39f792..bbb62f152b9 100644
--- a/java/test/src/main/java/test/Ice/interceptor/Client.java
+++ b/java/test/src/main/java/test/Ice/interceptor/Client.java
@@ -111,6 +111,16 @@ public class Client extends test.TestHelper
test(prx.amdAddWithRetry(33, 12) == 45);
test(interceptor.getLastOperation().equals("amdAddWithRetry"));
test(interceptor.getLastStatus());
+ {
+ java.util.Map<String, String> ctx = new java.util.HashMap<>();
+ ctx.put("retry", "yes");
+ for(int i = 0; i < 10; ++i)
+ {
+ test(prx.amdAdd(33, 12, ctx) == 45);
+ test(interceptor.getLastOperation().equals("amdAdd"));
+ test(interceptor.getLastStatus());
+ }
+ }
out.println("ok");
out.print("testing user exception... ");
out.flush();
diff --git a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
index 5ea248e1c43..afd5597f6b0 100644
--- a/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
+++ b/java/test/src/main/java/test/Ice/interceptor/InterceptorI.java
@@ -70,6 +70,15 @@ class InterceptorI extends com.zeroc.Ice.DispatchInterceptor
current.ctx.put("retry", "no");
}
+ else if(current.ctx.get("retry") != null && current.ctx.get("retry").equals("yes"))
+ {
+ //
+ // Retry the dispatch to ensure that abandoning the result of the dispatch
+ // works fine and is thread-safe
+ //
+ _servant.ice_dispatch(request);
+ _servant.ice_dispatch(request);
+ }
CompletionStage<OutputStream> f = _servant.ice_dispatch(request);
_lastStatus = f != null;