summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/AsyncResult.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/AsyncResult.cs')
-rw-r--r--csharp/src/Ice/AsyncResult.cs358
1 files changed, 75 insertions, 283 deletions
diff --git a/csharp/src/Ice/AsyncResult.cs b/csharp/src/Ice/AsyncResult.cs
index 5fad90abb05..95729efe018 100644
--- a/csharp/src/Ice/AsyncResult.cs
+++ b/csharp/src/Ice/AsyncResult.cs
@@ -26,8 +26,8 @@ namespace Ice
///
/// <summary>
- /// Callback for the successful completion of an operation
- /// that returns no data.
+ /// Callback to inform when a call has been passed to the local
+ /// transport.
/// </summary>
///
public delegate void SentCallback(bool sentSynchronously);
@@ -54,6 +54,7 @@ namespace Ice
ObjectPrx getProxy();
bool isCompleted_();
+
void waitForCompleted();
bool isSent();
@@ -67,7 +68,6 @@ namespace Ice
AsyncResult whenSent(Ice.AsyncCallback cb);
AsyncResult whenSent(Ice.SentCallback cb);
-
AsyncResult whenCompleted(Ice.ExceptionCallback excb);
}
@@ -82,18 +82,15 @@ namespace Ice
namespace IceInternal
{
- using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
- public delegate void ProxyTwowayCallback<T>(Ice.AsyncResult result, T cb, Ice.ExceptionCallback excb);
- public delegate void ProxyOnewayCallback<T>(T cb);
-
- public class AsyncResultI : Ice.AsyncResult
+ abstract public class AsyncResultI : Ice.AsyncResult
{
public virtual void cancel()
{
- cancel(new Ice.InvocationCanceledException());
+ Debug.Assert(outgoing_ != null);
+ outgoing_.cancel();
}
public Ice.Communicator getCommunicator()
@@ -125,7 +122,7 @@ namespace IceInternal
{
while((state_ & StateDone) == 0)
{
- System.Threading.Monitor.Wait(this);
+ Monitor.Wait(this);
}
}
}
@@ -142,9 +139,9 @@ namespace IceInternal
{
lock(this)
{
- while((state_ & StateSent) == 0 && _exception == null)
+ while((state_ & StateSent) == 0 && exception_ == null)
{
- System.Threading.Monitor.Wait(this);
+ Monitor.Wait(this);
}
}
}
@@ -153,22 +150,22 @@ namespace IceInternal
{
lock(this)
{
- if(_exception != null)
+ if(exception_ != null)
{
- throw _exception;
+ throw exception_;
}
}
}
public bool sentSynchronously()
{
- return sentSynchronously_; // No lock needed
+ Debug.Assert(outgoing_ != null);
+ return outgoing_.sentSynchronously(); // No lock needed
}
//
// Implementation of System.IAsyncResult properties
//
-
public bool IsCompleted
{
get
@@ -181,11 +178,12 @@ namespace IceInternal
{
get
{
+ Debug.Assert(outgoing_ != null);
if(getProxy() != null && getProxy().ice_isTwoway())
{
return false;
}
- return sentSynchronously_;
+ return outgoing_.sentSynchronously();
}
}
@@ -203,19 +201,27 @@ namespace IceInternal
{
lock(this)
{
- if(_waitHandle == null)
+ if(waitHandle_ == null)
{
- _waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
+ waitHandle_ = new EventWaitHandle(false, EventResetMode.ManualReset);
}
if((state_ & StateDone) != 0)
{
- _waitHandle.Set();
+ waitHandle_.Set();
}
- return _waitHandle;
+ return waitHandle_;
}
}
}
+ public OutgoingAsyncBase OutgoingAsync
+ {
+ get
+ {
+ return outgoing_;
+ }
+ }
+
public Ice.AsyncResult whenSent(Ice.AsyncCallback cb)
{
lock(this)
@@ -224,22 +230,22 @@ namespace IceInternal
{
throw new System.ArgumentException("callback is null");
}
- if(_sentCallback != null)
+ if(sentCallback_ != null)
{
throw new System.ArgumentException("sent callback already set");
}
- _sentCallback = cb;
+ sentCallback_ = cb;
if((state_ & StateSent) == 0)
{
return this;
}
}
- if(sentSynchronously_)
+ if(outgoing_.sentSynchronously())
{
try
{
- _sentCallback(this);
+ sentCallback_(this);
}
catch(System.Exception ex)
{
@@ -252,7 +258,7 @@ namespace IceInternal
{
try
{
- _sentCallback(this);
+ sentCallback_(this);
}
catch(System.Exception ex)
{
@@ -271,21 +277,21 @@ namespace IceInternal
{
throw new System.ArgumentException("callback is null");
}
- if(_sentCallback != null)
+ if(sentCallback_ != null)
{
throw new System.ArgumentException("sent callback already set");
}
- _sentCallback = (Ice.AsyncResult result) =>
- {
- cb(result.sentSynchronously());
- };
+ sentCallback_ = (Ice.AsyncResult r) =>
+ {
+ cb(r.sentSynchronously());
+ };
if((state_ & StateSent) == 0)
{
return this;
}
}
- if(sentSynchronously_)
+ if(outgoing_.sentSynchronously())
{
try
{
@@ -313,12 +319,6 @@ namespace IceInternal
return this;
}
- public Ice.AsyncResult whenCompletedWithAsyncCallback(Ice.AsyncCallback cb)
- {
- setCompletedCallback(cb);
- return this;
- }
-
public Ice.AsyncResult whenCompleted(Ice.ExceptionCallback cb)
{
if(cb == null)
@@ -342,103 +342,6 @@ namespace IceInternal
return _operation;
}
- public void invokeSent(Ice.AsyncCallback cb)
- {
- Debug.Assert(cb != null);
- try
- {
- cb(this);
- }
- catch(System.Exception ex)
- {
- warning(ex);
- }
-
- if(observer_ != null)
- {
- Ice.ObjectPrx proxy = getProxy();
- if(proxy == null || !proxy.ice_isTwoway())
- {
- observer_.detach();
- observer_ = null;
- }
- }
- }
-
- public void invokeSentAsync(Ice.AsyncCallback cb)
- {
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- Debug.Assert(cb != null);
- try
- {
- instance_.clientThreadPool().dispatch(() =>
- {
- invokeSent(cb);
- }, cachedConnection_);
- }
- catch(Ice.CommunicatorDestroyedException)
- {
- }
- }
-
- public void invokeCompleted(Ice.AsyncCallback cb)
- {
- Debug.Assert(cb != null);
- try
- {
- cb(this);
- }
- catch(System.Exception ex)
- {
- warning(ex);
- }
-
- if(observer_ != null)
- {
- observer_.detach();
- observer_ = null;
- }
- }
-
- public void invokeCompletedAsync(Ice.AsyncCallback cb)
- {
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- Debug.Assert(cb != null);
-
- // CommunicatorDestroyedException is the only exception that can propagate directly.
- instance_.clientThreadPool().dispatch(() =>
- {
- invokeCompleted(cb);
- }, cachedConnection_);
- }
-
- public virtual void cancelable(CancellationHandler handler)
- {
- lock(this)
- {
- if(_cancellationException != null)
- {
- try
- {
- throw _cancellationException;
- }
- finally
- {
- _cancellationException = null;
- }
- }
- _cancellationHandler = handler;
- }
- }
-
public bool wait()
{
lock(this)
@@ -450,121 +353,30 @@ namespace IceInternal
state_ |= StateEndCalled;
while((state_ & StateDone) == 0)
{
- System.Threading.Monitor.Wait(this);
+ Monitor.Wait(this);
}
- if(_exception != null)
+ if(exception_ != null)
{
- throw _exception;
+ throw exception_;
}
return (state_ & StateOK) != 0;
}
}
- public virtual void cacheMessageBuffers()
- {
- }
-
- protected AsyncResultI(Ice.Communicator communicator, Instance instance, string op, object cookie)
+ protected AsyncResultI(Ice.Communicator communicator,
+ Instance instance,
+ string op,
+ object cookie,
+ Ice.AsyncCallback cb)
{
instance_ = instance;
- sentSynchronously_ = false;
state_ = 0;
_communicator = communicator;
_operation = op;
- _exception = null;
+ exception_ = null;
_cookie = cookie;
- }
-
- protected Ice.AsyncCallback sent(bool done)
- {
- lock(this)
- {
- Debug.Assert(_exception == null);
-
- bool alreadySent = (state_ & StateSent) != 0;
- state_ |= StateSent;
- if(done)
- {
- state_ |= StateDone | StateOK;
- _cancellationHandler = null;
- if(observer_ != null && _sentCallback == null)
- {
- observer_.detach();
- observer_ = null;
- }
-
- //
- // For oneway requests after the data has been sent
- // the buffers can be reused unless this is a
- // collocated invocation. For collocated invocations
- // the buffer won't be reused because it has already
- // been marked as cached in invokeCollocated.
- //
- cacheMessageBuffers();
- }
- if(_waitHandle != null)
- {
- _waitHandle.Set();
- }
- System.Threading.Monitor.PulseAll(this);
- return !alreadySent ? _sentCallback : null;
- }
- }
-
- protected Ice.AsyncCallback finished(bool ok)
- {
- lock(this)
- {
- state_ |= StateDone;
- if(ok)
- {
- state_ |= StateOK;
- }
- _cancellationHandler = null;
- if(_completedCallback == null)
- {
- if(observer_ != null)
- {
- observer_.detach();
- observer_ = null;
- }
- }
- if(_waitHandle != null)
- {
- _waitHandle.Set();
- }
- System.Threading.Monitor.PulseAll(this);
- return _completedCallback;
- }
- }
-
- protected Ice.AsyncCallback finished(Ice.Exception ex)
- {
- lock(this)
- {
- state_ |= StateDone;
- _exception = ex;
- _cancellationHandler = null;
- if(observer_ != null)
- {
- observer_.failed(ex.ice_id());
- }
- if(_completedCallback == null)
- {
- if(observer_ != null)
- {
- observer_.detach();
- observer_ = null;
- }
- }
- if(_waitHandle != null)
- {
- _waitHandle.Set();
- }
- System.Threading.Monitor.PulseAll(this);
- return _completedCallback;
- }
+ completedCallback_ = cb;
}
protected void setCompletedCallback(Ice.AsyncCallback cb)
@@ -575,16 +387,16 @@ namespace IceInternal
{
throw new System.ArgumentException("callback is null");
}
- if(_completedCallback != null)
+ if(completedCallback_ != null)
{
throw new System.ArgumentException("callback already set");
}
- _completedCallback = cb;
+ completedCallback_ = cb;
if((state_ & StateDone) == 0)
{
return;
}
- else if((getProxy() == null || !getProxy().ice_isTwoway()) && _exception == null)
+ else if((getProxy() == null || !getProxy().ice_isTwoway()) && exception_ == null)
{
return;
}
@@ -594,7 +406,14 @@ namespace IceInternal
{
try
{
- cb(this);
+ try
+ {
+ cb(this);
+ }
+ catch(System.AggregateException ex)
+ {
+ throw ex.InnerException;
+ }
}
catch(System.Exception ex)
{
@@ -603,44 +422,21 @@ namespace IceInternal
}, cachedConnection_);
}
- protected virtual Ice.AsyncCallback getCompletedCallback()
- {
- return (Ice.AsyncResult result) =>
- {
- Debug.Assert(exceptionCallback_ != null);
- try
- {
- ((AsyncResultI)result).wait();
- }
- catch(Ice.Exception ex)
- {
- exceptionCallback_(ex);
- return;
- }
- };
- }
+ abstract protected Ice.AsyncCallback getCompletedCallback();
- protected void cancel(Ice.LocalException ex)
+ public static AsyncResultI check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
{
- CancellationHandler handler;
- lock(this)
+ if(r != null && r.getProxy() != prx)
{
- _cancellationException = ex;
- if(_cancellationHandler == null)
- {
- return;
- }
- handler = _cancellationHandler;
+ throw new System.ArgumentException("Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding begin_" +
+ operation + " method");
}
- handler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
+ return check(r, operation);
}
- protected virtual Ice.Instrumentation.InvocationObserver getObserver()
- {
- return observer_;
- }
- protected static T check<T>(Ice.AsyncResult r, string operation)
+ public static AsyncResultI check(Ice.AsyncResult r, string operation)
{
if(r == null)
{
@@ -651,11 +447,11 @@ namespace IceInternal
throw new System.ArgumentException("Incorrect operation for end_" + operation + " method: " +
r.getOperation());
}
- if(!(r is T))
+ if(!(r is AsyncResultI))
{
throw new System.ArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
}
- return (T)r;
+ return (AsyncResultI)r;
}
protected void warning(System.Exception ex)
@@ -666,29 +462,25 @@ namespace IceInternal
}
}
- protected IceInternal.Instance instance_;
+ protected Instance instance_;
protected Ice.Instrumentation.InvocationObserver observer_;
protected Ice.Connection cachedConnection_;
- protected bool sentSynchronously_;
private readonly Ice.Communicator _communicator;
private readonly string _operation;
private readonly object _cookie;
- private Ice.Exception _exception;
- private EventWaitHandle _waitHandle;
-
- private CancellationHandler _cancellationHandler;
- private Ice.LocalException _cancellationException;
+ protected Ice.Exception exception_;
+ protected EventWaitHandle waitHandle_;
- private Ice.AsyncCallback _completedCallback;
- private Ice.AsyncCallback _sentCallback;
+ protected Ice.AsyncCallback completedCallback_;
+ protected Ice.AsyncCallback sentCallback_;
protected Ice.ExceptionCallback exceptionCallback_;
protected const int StateOK = 0x1;
protected const int StateDone = 0x2;
protected const int StateSent = 0x4;
protected const int StateEndCalled = 0x8;
- protected const int StateCachedBuffers = 0x10;
protected int state_;
+ protected OutgoingAsyncBase outgoing_;
}
}