summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/Incoming.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/Incoming.cs')
-rw-r--r--csharp/src/Ice/Incoming.cs94
1 files changed, 72 insertions, 22 deletions
diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs
index 621099d773b..c467f67c349 100644
--- a/csharp/src/Ice/Incoming.cs
+++ b/csharp/src/Ice/Incoming.cs
@@ -268,17 +268,27 @@ namespace IceInternal
{
if(task == null)
{
- _os = startWriteParams();
- write(_os, default(R));
- endWriteParams(_os);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ //
+ // Write default constructed response if no task is provided
+ //
+ var os = startWriteParams();
+ write(os, default(R));
+ endWriteParams(os);
+ return setResult(os);
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<R> t) =>
{
var result = t.GetAwaiter().GetResult();
- var os = startWriteParams();
+ var os = startWriteParams(cached);
write(os, result);
endWriteParams(os);
return Task.FromResult(os);
@@ -290,15 +300,24 @@ namespace IceInternal
{
if(task == null)
{
- _os = writeEmptyParams();
- return null;
+ //
+ // Write response if no task is provided
+ //
+ return setResult(writeEmptyParams());
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task t) =>
{
t.GetAwaiter().GetResult();
- return Task.FromResult(writeEmptyParams());
+ return Task.FromResult(writeEmptyParams(cached));
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
}
@@ -307,11 +326,15 @@ namespace IceInternal
{
if(task == null)
{
- _os = default(T).getOutputStream(_current);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ return setResult(default(T).getOutputStream(_current));
}
else
{
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<T> t) =>
{
return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current));
@@ -432,6 +455,20 @@ namespace IceInternal
_format = format;
}
+ public Ice.OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ var cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public Ice.OutputStream createResponseOutputStream(Ice.Current current)
{
var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding);
@@ -441,17 +478,18 @@ namespace IceInternal
return os;
}
- public Ice.OutputStream startWriteParams()
+ private Ice.OutputStream startWriteParams(Ice.OutputStream os)
{
if(!_response)
{
throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -459,6 +497,11 @@ namespace IceInternal
return os;
}
+ public Ice.OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(Ice.OutputStream os)
{
if(_response)
@@ -467,14 +510,15 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeEmptyParams()
+ private Ice.OutputStream writeEmptyParams(Ice.OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -487,7 +531,12 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeParamEncaps(byte[] v, bool ok)
+ public Ice.OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok)
{
if(!ok && _observer != null)
{
@@ -496,10 +545,11 @@ namespace IceInternal
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);