diff options
author | Jose <jose@zeroc.com> | 2019-07-18 23:51:08 +0200 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2019-07-18 23:51:08 +0200 |
commit | fc886b010c01cccb8cca3ac4d92f1ebd7fc72295 (patch) | |
tree | 51cf00a4a955efecc9c94527aeafcb25ffbe57b9 /csharp/src | |
parent | Simplify OutputStream creation (diff) | |
parent | Fixed non-thread safe AMD dispatch, fixes #448 (#449) (diff) | |
download | ice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.tar.bz2 ice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.tar.xz ice-fc886b010c01cccb8cca3ac4d92f1ebd7fc72295.zip |
Merge remote-tracking branch 'origin/3.7' into swift
Diffstat (limited to 'csharp/src')
-rw-r--r-- | csharp/src/Ice/DispatchInterceptor.cs | 18 | ||||
-rw-r--r-- | csharp/src/Ice/Incoming.cs | 94 | ||||
-rw-r--r-- | csharp/src/Ice/Object.cs | 8 |
3 files changed, 94 insertions, 26 deletions
diff --git a/csharp/src/Ice/DispatchInterceptor.cs b/csharp/src/Ice/DispatchInterceptor.cs index 665b504ef3e..54e39251ac6 100644 --- a/csharp/src/Ice/DispatchInterceptor.cs +++ b/csharp/src/Ice/DispatchInterceptor.cs @@ -26,7 +26,23 @@ namespace Ice public override System.Threading.Tasks.Task<OutputStream> iceDispatch(IceInternal.Incoming inc, Current current) { - return dispatch(inc); + try + { + return dispatch(inc); + } + catch(Exception) + { + // + // If the input parameters weren't read, make sure we skip them here. It's needed to read the + // encoding version used by the client to eventually marshal the user exception. It's also needed + // if we dispatch a batch oneway request to read the next batch request. + // + if(current.encoding == null || (current.encoding.major == 0 && current.encoding.minor == 0)) + { + inc.skipReadParams(); + } + throw; + } } } } diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs index 621099d773b..c467f67c349 100644 --- a/csharp/src/Ice/Incoming.cs +++ b/csharp/src/Ice/Incoming.cs @@ -268,17 +268,27 @@ namespace IceInternal { if(task == null) { - _os = startWriteParams(); - write(_os, default(R)); - endWriteParams(_os); - return null; // Response is cached in the Incoming to not have to create unecessary Task + // + // Write default constructed response if no task is provided + // + var os = startWriteParams(); + write(os, default(R)); + endWriteParams(os); + return setResult(os); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<R> t) => { var result = t.GetAwaiter().GetResult(); - var os = startWriteParams(); + var os = startWriteParams(cached); write(os, result); endWriteParams(os); return Task.FromResult(os); @@ -290,15 +300,24 @@ namespace IceInternal { if(task == null) { - _os = writeEmptyParams(); - return null; + // + // Write response if no task is provided + // + return setResult(writeEmptyParams()); } else { + var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it + + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task t) => { t.GetAwaiter().GetResult(); - return Task.FromResult(writeEmptyParams()); + return Task.FromResult(writeEmptyParams(cached)); }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } } @@ -307,11 +326,15 @@ namespace IceInternal { if(task == null) { - _os = default(T).getOutputStream(_current); - return null; // Response is cached in the Incoming to not have to create unecessary Task + return setResult(default(T).getOutputStream(_current)); } else { + // + // NOTE: it's important that the continuation doesn't mutate the Incoming state to + // guarantee thread-safety. Multiple continuations can execute concurrently if the + // user installed a dispatch interceptor and the dispatch is retried. + // return task.ContinueWith((Task<T> t) => { return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current)); @@ -432,6 +455,20 @@ namespace IceInternal _format = format; } + public Ice.OutputStream getAndClearCachedOutputStream() + { + if(_response) + { + var cached = _os; + _os = null; + return cached; + } + else + { + return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request + } + } + static public Ice.OutputStream createResponseOutputStream(Ice.Current current) { var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding); @@ -441,17 +478,18 @@ namespace IceInternal return os; } - public Ice.OutputStream startWriteParams() + private Ice.OutputStream startWriteParams(Ice.OutputStream os) { if(!_response) { throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -459,6 +497,11 @@ namespace IceInternal return os; } + public Ice.OutputStream startWriteParams() + { + return startWriteParams(getAndClearCachedOutputStream()); + } + public void endWriteParams(Ice.OutputStream os) { if(_response) @@ -467,14 +510,15 @@ namespace IceInternal } } - public Ice.OutputStream writeEmptyParams() + private Ice.OutputStream writeEmptyParams(Ice.OutputStream os) { if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ReplyStatus.replyOK); @@ -487,7 +531,12 @@ namespace IceInternal } } - public Ice.OutputStream writeParamEncaps(byte[] v, bool ok) + public Ice.OutputStream writeEmptyParams() + { + return writeEmptyParams(getAndClearCachedOutputStream()); + } + + public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok) { if(!ok && _observer != null) { @@ -496,10 +545,11 @@ namespace IceInternal if(_response) { - // If there's an output stream set, re-use it for the response - var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os; + if(os == null) // Create the output stream if none is provided + { + os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); + } Debug.Assert(os.pos() == 0); - _os = null; os.writeBlob(Protocol.replyHdr); os.writeInt(_current.requestId); os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); diff --git a/csharp/src/Ice/Object.cs b/csharp/src/Ice/Object.cs index dd9913ba5f3..78f776e671b 100644 --- a/csharp/src/Ice/Object.cs +++ b/csharp/src/Ice/Object.cs @@ -310,7 +310,7 @@ namespace Ice byte[] inEncaps = inS.readParamEncaps(); byte[] outEncaps; bool ok = ice_invoke(inEncaps, out outEncaps, current); - inS.setResult(inS.writeParamEncaps(outEncaps, ok)); + inS.setResult(inS.writeParamEncaps(inS.getAndClearCachedOutputStream(), outEncaps, ok)); return null; } } @@ -323,10 +323,12 @@ namespace Ice public override Task<Ice.OutputStream> iceDispatch(IceInternal.Incoming inS, Current current) { byte[] inEncaps = inS.readParamEncaps(); - return ice_invokeAsync(inEncaps, current).ContinueWith((Task<Object_Ice_invokeResult> t) => + var task = ice_invokeAsync(inEncaps, current); + var cached = inS.getAndClearCachedOutputStream(); + return task.ContinueWith((Task<Object_Ice_invokeResult> t) => { var ret = t.GetAwaiter().GetResult(); - return Task.FromResult(inS.writeParamEncaps(ret.outEncaps, ret.returnValue)); + return Task.FromResult(inS.writeParamEncaps(cached, ret.outEncaps, ret.returnValue)); }).Unwrap(); } } |