summaryrefslogtreecommitdiff
path: root/csharp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2019-07-18 18:09:15 +0200
committerGitHub <noreply@github.com>2019-07-18 18:09:15 +0200
commit2c036ca6ca7f39b6987135839a1a899cd080877d (patch)
tree71b429a5784a77ec41084ba84105930f537f195e /csharp/src
parentBumped invocation timeout in Objectice-C timeout test (diff)
downloadice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.bz2
ice-2c036ca6ca7f39b6987135839a1a899cd080877d.tar.xz
ice-2c036ca6ca7f39b6987135839a1a899cd080877d.zip
Fixed non-thread safe AMD dispatch, fixes #448 (#449)
The caching of the output stream which was added back to solve #414 made the dispatch of AMD requests non thread-safe if a dispatch interceptor was installed and if the interceptor retried the dispatch. Multiple continuation could run concurrently and eventually re-use the cached output stream to marshal multiple responses.
Diffstat (limited to 'csharp/src')
-rw-r--r--csharp/src/Ice/Incoming.cs94
-rw-r--r--csharp/src/Ice/Object.cs8
2 files changed, 77 insertions, 25 deletions
diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs
index 621099d773b..c467f67c349 100644
--- a/csharp/src/Ice/Incoming.cs
+++ b/csharp/src/Ice/Incoming.cs
@@ -268,17 +268,27 @@ namespace IceInternal
{
if(task == null)
{
- _os = startWriteParams();
- write(_os, default(R));
- endWriteParams(_os);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ //
+ // Write default constructed response if no task is provided
+ //
+ var os = startWriteParams();
+ write(os, default(R));
+ endWriteParams(os);
+ return setResult(os);
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<R> t) =>
{
var result = t.GetAwaiter().GetResult();
- var os = startWriteParams();
+ var os = startWriteParams(cached);
write(os, result);
endWriteParams(os);
return Task.FromResult(os);
@@ -290,15 +300,24 @@ namespace IceInternal
{
if(task == null)
{
- _os = writeEmptyParams();
- return null;
+ //
+ // Write response if no task is provided
+ //
+ return setResult(writeEmptyParams());
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task t) =>
{
t.GetAwaiter().GetResult();
- return Task.FromResult(writeEmptyParams());
+ return Task.FromResult(writeEmptyParams(cached));
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
}
@@ -307,11 +326,15 @@ namespace IceInternal
{
if(task == null)
{
- _os = default(T).getOutputStream(_current);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ return setResult(default(T).getOutputStream(_current));
}
else
{
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<T> t) =>
{
return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current));
@@ -432,6 +455,20 @@ namespace IceInternal
_format = format;
}
+ public Ice.OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ var cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public Ice.OutputStream createResponseOutputStream(Ice.Current current)
{
var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding);
@@ -441,17 +478,18 @@ namespace IceInternal
return os;
}
- public Ice.OutputStream startWriteParams()
+ private Ice.OutputStream startWriteParams(Ice.OutputStream os)
{
if(!_response)
{
throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -459,6 +497,11 @@ namespace IceInternal
return os;
}
+ public Ice.OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(Ice.OutputStream os)
{
if(_response)
@@ -467,14 +510,15 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeEmptyParams()
+ private Ice.OutputStream writeEmptyParams(Ice.OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -487,7 +531,12 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeParamEncaps(byte[] v, bool ok)
+ public Ice.OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok)
{
if(!ok && _observer != null)
{
@@ -496,10 +545,11 @@ namespace IceInternal
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
diff --git a/csharp/src/Ice/Object.cs b/csharp/src/Ice/Object.cs
index dd9913ba5f3..78f776e671b 100644
--- a/csharp/src/Ice/Object.cs
+++ b/csharp/src/Ice/Object.cs
@@ -310,7 +310,7 @@ namespace Ice
byte[] inEncaps = inS.readParamEncaps();
byte[] outEncaps;
bool ok = ice_invoke(inEncaps, out outEncaps, current);
- inS.setResult(inS.writeParamEncaps(outEncaps, ok));
+ inS.setResult(inS.writeParamEncaps(inS.getAndClearCachedOutputStream(), outEncaps, ok));
return null;
}
}
@@ -323,10 +323,12 @@ namespace Ice
public override Task<Ice.OutputStream> iceDispatch(IceInternal.Incoming inS, Current current)
{
byte[] inEncaps = inS.readParamEncaps();
- return ice_invokeAsync(inEncaps, current).ContinueWith((Task<Object_Ice_invokeResult> t) =>
+ var task = ice_invokeAsync(inEncaps, current);
+ var cached = inS.getAndClearCachedOutputStream();
+ return task.ContinueWith((Task<Object_Ice_invokeResult> t) =>
{
var ret = t.GetAwaiter().GetResult();
- return Task.FromResult(inS.writeParamEncaps(ret.outEncaps, ret.returnValue));
+ return Task.FromResult(inS.writeParamEncaps(cached, ret.outEncaps, ret.returnValue));
}).Unwrap();
}
}