summaryrefslogtreecommitdiff
path: root/csharp/src
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src')
-rw-r--r--csharp/src/Ice/DispatchInterceptor.cs18
-rw-r--r--csharp/src/Ice/Incoming.cs94
-rw-r--r--csharp/src/Ice/Object.cs8
3 files changed, 94 insertions, 26 deletions
diff --git a/csharp/src/Ice/DispatchInterceptor.cs b/csharp/src/Ice/DispatchInterceptor.cs
index 665b504ef3e..54e39251ac6 100644
--- a/csharp/src/Ice/DispatchInterceptor.cs
+++ b/csharp/src/Ice/DispatchInterceptor.cs
@@ -26,7 +26,23 @@ namespace Ice
public override System.Threading.Tasks.Task<OutputStream>
iceDispatch(IceInternal.Incoming inc, Current current)
{
- return dispatch(inc);
+ try
+ {
+ return dispatch(inc);
+ }
+ catch(Exception)
+ {
+ //
+ // If the input parameters weren't read, make sure we skip them here. It's needed to read the
+ // encoding version used by the client to eventually marshal the user exception. It's also needed
+ // if we dispatch a batch oneway request to read the next batch request.
+ //
+ if(current.encoding == null || (current.encoding.major == 0 && current.encoding.minor == 0))
+ {
+ inc.skipReadParams();
+ }
+ throw;
+ }
}
}
}
diff --git a/csharp/src/Ice/Incoming.cs b/csharp/src/Ice/Incoming.cs
index 621099d773b..c467f67c349 100644
--- a/csharp/src/Ice/Incoming.cs
+++ b/csharp/src/Ice/Incoming.cs
@@ -268,17 +268,27 @@ namespace IceInternal
{
if(task == null)
{
- _os = startWriteParams();
- write(_os, default(R));
- endWriteParams(_os);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ //
+ // Write default constructed response if no task is provided
+ //
+ var os = startWriteParams();
+ write(os, default(R));
+ endWriteParams(os);
+ return setResult(os);
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<R> t) =>
{
var result = t.GetAwaiter().GetResult();
- var os = startWriteParams();
+ var os = startWriteParams(cached);
write(os, result);
endWriteParams(os);
return Task.FromResult(os);
@@ -290,15 +300,24 @@ namespace IceInternal
{
if(task == null)
{
- _os = writeEmptyParams();
- return null;
+ //
+ // Write response if no task is provided
+ //
+ return setResult(writeEmptyParams());
}
else
{
+ var cached = getAndClearCachedOutputStream(); // If an output stream is cached, re-use it
+
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task t) =>
{
t.GetAwaiter().GetResult();
- return Task.FromResult(writeEmptyParams());
+ return Task.FromResult(writeEmptyParams(cached));
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
}
@@ -307,11 +326,15 @@ namespace IceInternal
{
if(task == null)
{
- _os = default(T).getOutputStream(_current);
- return null; // Response is cached in the Incoming to not have to create unecessary Task
+ return setResult(default(T).getOutputStream(_current));
}
else
{
+ //
+ // NOTE: it's important that the continuation doesn't mutate the Incoming state to
+ // guarantee thread-safety. Multiple continuations can execute concurrently if the
+ // user installed a dispatch interceptor and the dispatch is retried.
+ //
return task.ContinueWith((Task<T> t) =>
{
return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current));
@@ -432,6 +455,20 @@ namespace IceInternal
_format = format;
}
+ public Ice.OutputStream getAndClearCachedOutputStream()
+ {
+ if(_response)
+ {
+ var cached = _os;
+ _os = null;
+ return cached;
+ }
+ else
+ {
+ return null; // Don't consume unnecessarily the output stream if we are dispatching a oneway request
+ }
+ }
+
static public Ice.OutputStream createResponseOutputStream(Ice.Current current)
{
var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding);
@@ -441,17 +478,18 @@ namespace IceInternal
return os;
}
- public Ice.OutputStream startWriteParams()
+ private Ice.OutputStream startWriteParams(Ice.OutputStream os)
{
if(!_response)
{
throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -459,6 +497,11 @@ namespace IceInternal
return os;
}
+ public Ice.OutputStream startWriteParams()
+ {
+ return startWriteParams(getAndClearCachedOutputStream());
+ }
+
public void endWriteParams(Ice.OutputStream os)
{
if(_response)
@@ -467,14 +510,15 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeEmptyParams()
+ private Ice.OutputStream writeEmptyParams(Ice.OutputStream os)
{
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ReplyStatus.replyOK);
@@ -487,7 +531,12 @@ namespace IceInternal
}
}
- public Ice.OutputStream writeParamEncaps(byte[] v, bool ok)
+ public Ice.OutputStream writeEmptyParams()
+ {
+ return writeEmptyParams(getAndClearCachedOutputStream());
+ }
+
+ public Ice.OutputStream writeParamEncaps(Ice.OutputStream os, byte[] v, bool ok)
{
if(!ok && _observer != null)
{
@@ -496,10 +545,11 @@ namespace IceInternal
if(_response)
{
- // If there's an output stream set, re-use it for the response
- var os = _os == null ? new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding) : _os;
+ if(os == null) // Create the output stream if none is provided
+ {
+ os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding);
+ }
Debug.Assert(os.pos() == 0);
- _os = null;
os.writeBlob(Protocol.replyHdr);
os.writeInt(_current.requestId);
os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
diff --git a/csharp/src/Ice/Object.cs b/csharp/src/Ice/Object.cs
index dd9913ba5f3..78f776e671b 100644
--- a/csharp/src/Ice/Object.cs
+++ b/csharp/src/Ice/Object.cs
@@ -310,7 +310,7 @@ namespace Ice
byte[] inEncaps = inS.readParamEncaps();
byte[] outEncaps;
bool ok = ice_invoke(inEncaps, out outEncaps, current);
- inS.setResult(inS.writeParamEncaps(outEncaps, ok));
+ inS.setResult(inS.writeParamEncaps(inS.getAndClearCachedOutputStream(), outEncaps, ok));
return null;
}
}
@@ -323,10 +323,12 @@ namespace Ice
public override Task<Ice.OutputStream> iceDispatch(IceInternal.Incoming inS, Current current)
{
byte[] inEncaps = inS.readParamEncaps();
- return ice_invokeAsync(inEncaps, current).ContinueWith((Task<Object_Ice_invokeResult> t) =>
+ var task = ice_invokeAsync(inEncaps, current);
+ var cached = inS.getAndClearCachedOutputStream();
+ return task.ContinueWith((Task<Object_Ice_invokeResult> t) =>
{
var ret = t.GetAwaiter().GetResult();
- return Task.FromResult(inS.writeParamEncaps(ret.outEncaps, ret.returnValue));
+ return Task.FromResult(inS.writeParamEncaps(cached, ret.outEncaps, ret.returnValue));
}).Unwrap();
}
}