diff options
Diffstat (limited to 'csharp/src/Ice/Incoming.cs')
-rw-r--r-- | csharp/src/Ice/Incoming.cs | 94 |
1 files changed, 72 insertions, 22 deletions
diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs index 621099d773b..c467f67c349 100644 --- a/csharp/src/Ice/Incoming.cs +++ b/csharp/src/Ice/Incoming.cs @@ -268,17 +268,27 @@ namespace IceInternal { if(task == null) { - _os = startWriteParams(); - write(_os, default(R)); - endWriteParams(_os); - return null; // Response is cached in the Incoming to not have to create unecessary Task + // + // Write default constructed response if no task is provided + // + var os = startWriteParams(); + write(os, default(R)); + endWriteParams(os); + return setResult(os); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<R> t) => { var result = t.GetAwaiter().GetResult(); - var os = startWriteParams(); + var os = startWriteParams(cached); write(os, result); endWriteParams(os); return Task.FromResult(os); @@ -290,15 +300,24 @@ namespace IceInternal { if(task == null) { - _os = writeEmptyParams(); - return null; + // + // Write response if no task is provided + // + return setResult(writeEmptyParams()); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task t) => { t.GetAwaiter().GetResult(); - return Task.FromResult(writeEmptyParams()); + return Task.FromResult(writeEmptyParams(cached)); }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } } @@ -307,11 +326,15 @@ namespace IceInternal { if(task == null) { - _os = default(T).getOutputStream(_current); - return null; // Response is cached in the Incoming to not have to create unecessary Task + return setResult(default(T).getOutputStream(_current)); } else { + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<T> t) => { return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current)); @@ -432,6 +455,20 @@ namespace IceInternal _format = format; } + public Ice.OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + var cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public Ice.OutputStream createResponseOutputStream(Ice.Current current) { var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding); @@ -441,17 +478,18 @@ namespace IceInternal return os; } - public Ice.OutputStream startWriteParams() + private Ice.OutputStream startWriteParams(Ice.OutputStream os) { if(!_response) { throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -459,6 +497,11 @@ namespace IceInternal return os; } + public Ice.OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(Ice.OutputStream os) { if(_response) @@ -467,14 +510,15 @@ namespace IceInternal } } - public Ice.OutputStream writeEmptyParams() + private Ice.OutputStream writeEmptyParams(Ice.OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -487,7 +531,12 @@ namespace IceInternal } } - public Ice.OutputStream writeParamEncaps(byte[] v, bool ok) + public Ice.OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok) { if(!ok && _observer != null) { @@ -496,10 +545,11 @@ namespace IceInternal if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); |