diff options
Diffstat (limited to 'csharp/src/Ice/AsyncResult.cs')
-rw-r--r-- | csharp/src/Ice/AsyncResult.cs | 358 |
1 files changed, 75 insertions, 283 deletions
diff --git a/csharp/src/Ice/AsyncResult.cs b/csharp/src/Ice/AsyncResult.cs index 5fad90abb05..95729efe018 100644 --- a/csharp/src/Ice/AsyncResult.cs +++ b/csharp/src/Ice/AsyncResult.cs @@ -26,8 +26,8 @@ namespace Ice /// /// <summary> - /// Callback for the successful completion of an operation - /// that returns no data. + /// Callback to inform when a call has been passed to the local + /// transport. /// </summary> /// public delegate void SentCallback(bool sentSynchronously); @@ -54,6 +54,7 @@ namespace Ice ObjectPrx getProxy(); bool isCompleted_(); + void waitForCompleted(); bool isSent(); @@ -67,7 +68,6 @@ namespace Ice AsyncResult whenSent(Ice.AsyncCallback cb); AsyncResult whenSent(Ice.SentCallback cb); - AsyncResult whenCompleted(Ice.ExceptionCallback excb); } @@ -82,18 +82,15 @@ namespace Ice namespace IceInternal { - using System.Collections.Generic; using System.Diagnostics; using System.Threading; - public delegate void ProxyTwowayCallback<T>(Ice.AsyncResult result, T cb, Ice.ExceptionCallback excb); - public delegate void ProxyOnewayCallback<T>(T cb); - - public class AsyncResultI : Ice.AsyncResult + abstract public class AsyncResultI : Ice.AsyncResult { public virtual void cancel() { - cancel(new Ice.InvocationCanceledException()); + Debug.Assert(outgoing_ != null); + outgoing_.cancel(); } public Ice.Communicator getCommunicator() @@ -125,7 +122,7 @@ namespace IceInternal { while((state_ & StateDone) == 0) { - System.Threading.Monitor.Wait(this); + Monitor.Wait(this); } } } @@ -142,9 +139,9 @@ namespace IceInternal { lock(this) { - while((state_ & StateSent) == 0 && _exception == null) + while((state_ & StateSent) == 0 && exception_ == null) { - System.Threading.Monitor.Wait(this); + Monitor.Wait(this); } } } @@ -153,22 +150,22 @@ namespace IceInternal { lock(this) { - if(_exception != null) + if(exception_ != null) { - throw _exception; + throw exception_; } } } public bool sentSynchronously() { - return sentSynchronously_; // No lock needed + Debug.Assert(outgoing_ != null); + return outgoing_.sentSynchronously(); // No lock needed } // // Implementation of System.IAsyncResult properties // - public bool IsCompleted { get @@ -181,11 +178,12 @@ namespace IceInternal { get { + Debug.Assert(outgoing_ != null); if(getProxy() != null && getProxy().ice_isTwoway()) { return false; } - return sentSynchronously_; + return outgoing_.sentSynchronously(); } } @@ -203,19 +201,27 @@ namespace IceInternal { lock(this) { - if(_waitHandle == null) + if(waitHandle_ == null) { - _waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); + waitHandle_ = new EventWaitHandle(false, EventResetMode.ManualReset); } if((state_ & StateDone) != 0) { - _waitHandle.Set(); + waitHandle_.Set(); } - return _waitHandle; + return waitHandle_; } } } + public OutgoingAsyncBase OutgoingAsync + { + get + { + return outgoing_; + } + } + public Ice.AsyncResult whenSent(Ice.AsyncCallback cb) { lock(this) @@ -224,22 +230,22 @@ namespace IceInternal { throw new System.ArgumentException("callback is null"); } - if(_sentCallback != null) + if(sentCallback_ != null) { throw new System.ArgumentException("sent callback already set"); } - _sentCallback = cb; + sentCallback_ = cb; if((state_ & StateSent) == 0) { return this; } } - if(sentSynchronously_) + if(outgoing_.sentSynchronously()) { try { - _sentCallback(this); + sentCallback_(this); } catch(System.Exception ex) { @@ -252,7 +258,7 @@ namespace IceInternal { try { - _sentCallback(this); + sentCallback_(this); } catch(System.Exception ex) { @@ -271,21 +277,21 @@ namespace IceInternal { throw new System.ArgumentException("callback is null"); } - if(_sentCallback != null) + if(sentCallback_ != null) { throw new System.ArgumentException("sent callback already set"); } - _sentCallback = (Ice.AsyncResult result) => - { - cb(result.sentSynchronously()); - }; + sentCallback_ = (Ice.AsyncResult r) => + { + cb(r.sentSynchronously()); + }; if((state_ & StateSent) == 0) { return this; } } - if(sentSynchronously_) + if(outgoing_.sentSynchronously()) { try { @@ -313,12 +319,6 @@ namespace IceInternal return this; } - public Ice.AsyncResult whenCompletedWithAsyncCallback(Ice.AsyncCallback cb) - { - setCompletedCallback(cb); - return this; - } - public Ice.AsyncResult whenCompleted(Ice.ExceptionCallback cb) { if(cb == null) @@ -342,103 +342,6 @@ namespace IceInternal return _operation; } - public void invokeSent(Ice.AsyncCallback cb) - { - Debug.Assert(cb != null); - try - { - cb(this); - } - catch(System.Exception ex) - { - warning(ex); - } - - if(observer_ != null) - { - Ice.ObjectPrx proxy = getProxy(); - if(proxy == null || !proxy.ice_isTwoway()) - { - observer_.detach(); - observer_ = null; - } - } - } - - public void invokeSentAsync(Ice.AsyncCallback cb) - { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - Debug.Assert(cb != null); - try - { - instance_.clientThreadPool().dispatch(() => - { - invokeSent(cb); - }, cachedConnection_); - } - catch(Ice.CommunicatorDestroyedException) - { - } - } - - public void invokeCompleted(Ice.AsyncCallback cb) - { - Debug.Assert(cb != null); - try - { - cb(this); - } - catch(System.Exception ex) - { - warning(ex); - } - - if(observer_ != null) - { - observer_.detach(); - observer_ = null; - } - } - - public void invokeCompletedAsync(Ice.AsyncCallback cb) - { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - Debug.Assert(cb != null); - - // CommunicatorDestroyedException is the only exception that can propagate directly. - instance_.clientThreadPool().dispatch(() => - { - invokeCompleted(cb); - }, cachedConnection_); - } - - public virtual void cancelable(CancellationHandler handler) - { - lock(this) - { - if(_cancellationException != null) - { - try - { - throw _cancellationException; - } - finally - { - _cancellationException = null; - } - } - _cancellationHandler = handler; - } - } - public bool wait() { lock(this) @@ -450,121 +353,30 @@ namespace IceInternal state_ |= StateEndCalled; while((state_ & StateDone) == 0) { - System.Threading.Monitor.Wait(this); + Monitor.Wait(this); } - if(_exception != null) + if(exception_ != null) { - throw _exception; + throw exception_; } return (state_ & StateOK) != 0; } } - public virtual void cacheMessageBuffers() - { - } - - protected AsyncResultI(Ice.Communicator communicator, Instance instance, string op, object cookie) + protected AsyncResultI(Ice.Communicator communicator, + Instance instance, + string op, + object cookie, + Ice.AsyncCallback cb) { instance_ = instance; - sentSynchronously_ = false; state_ = 0; _communicator = communicator; _operation = op; - _exception = null; + exception_ = null; _cookie = cookie; - } - - protected Ice.AsyncCallback sent(bool done) - { - lock(this) - { - Debug.Assert(_exception == null); - - bool alreadySent = (state_ & StateSent) != 0; - state_ |= StateSent; - if(done) - { - state_ |= StateDone | StateOK; - _cancellationHandler = null; - if(observer_ != null && _sentCallback == null) - { - observer_.detach(); - observer_ = null; - } - - // - // For oneway requests after the data has been sent - // the buffers can be reused unless this is a - // collocated invocation. For collocated invocations - // the buffer won't be reused because it has already - // been marked as cached in invokeCollocated. - // - cacheMessageBuffers(); - } - if(_waitHandle != null) - { - _waitHandle.Set(); - } - System.Threading.Monitor.PulseAll(this); - return !alreadySent ? _sentCallback : null; - } - } - - protected Ice.AsyncCallback finished(bool ok) - { - lock(this) - { - state_ |= StateDone; - if(ok) - { - state_ |= StateOK; - } - _cancellationHandler = null; - if(_completedCallback == null) - { - if(observer_ != null) - { - observer_.detach(); - observer_ = null; - } - } - if(_waitHandle != null) - { - _waitHandle.Set(); - } - System.Threading.Monitor.PulseAll(this); - return _completedCallback; - } - } - - protected Ice.AsyncCallback finished(Ice.Exception ex) - { - lock(this) - { - state_ |= StateDone; - _exception = ex; - _cancellationHandler = null; - if(observer_ != null) - { - observer_.failed(ex.ice_id()); - } - if(_completedCallback == null) - { - if(observer_ != null) - { - observer_.detach(); - observer_ = null; - } - } - if(_waitHandle != null) - { - _waitHandle.Set(); - } - System.Threading.Monitor.PulseAll(this); - return _completedCallback; - } + completedCallback_ = cb; } protected void setCompletedCallback(Ice.AsyncCallback cb) @@ -575,16 +387,16 @@ namespace IceInternal { throw new System.ArgumentException("callback is null"); } - if(_completedCallback != null) + if(completedCallback_ != null) { throw new System.ArgumentException("callback already set"); } - _completedCallback = cb; + completedCallback_ = cb; if((state_ & StateDone) == 0) { return; } - else if((getProxy() == null || !getProxy().ice_isTwoway()) && _exception == null) + else if((getProxy() == null || !getProxy().ice_isTwoway()) && exception_ == null) { return; } @@ -594,7 +406,14 @@ namespace IceInternal { try { - cb(this); + try + { + cb(this); + } + catch(System.AggregateException ex) + { + throw ex.InnerException; + } } catch(System.Exception ex) { @@ -603,44 +422,21 @@ namespace IceInternal }, cachedConnection_); } - protected virtual Ice.AsyncCallback getCompletedCallback() - { - return (Ice.AsyncResult result) => - { - Debug.Assert(exceptionCallback_ != null); - try - { - ((AsyncResultI)result).wait(); - } - catch(Ice.Exception ex) - { - exceptionCallback_(ex); - return; - } - }; - } + abstract protected Ice.AsyncCallback getCompletedCallback(); - protected void cancel(Ice.LocalException ex) + public static AsyncResultI check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) { - CancellationHandler handler; - lock(this) + if(r != null && r.getProxy() != prx) { - _cancellationException = ex; - if(_cancellationHandler == null) - { - return; - } - handler = _cancellationHandler; + throw new System.ArgumentException("Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding begin_" + + operation + " method"); } - handler.asyncRequestCanceled((OutgoingAsyncBase)this, ex); + return check(r, operation); } - protected virtual Ice.Instrumentation.InvocationObserver getObserver() - { - return observer_; - } - protected static T check<T>(Ice.AsyncResult r, string operation) + public static AsyncResultI check(Ice.AsyncResult r, string operation) { if(r == null) { @@ -651,11 +447,11 @@ namespace IceInternal throw new System.ArgumentException("Incorrect operation for end_" + operation + " method: " + r.getOperation()); } - if(!(r is T)) + if(!(r is AsyncResultI)) { throw new System.ArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); } - return (T)r; + return (AsyncResultI)r; } protected void warning(System.Exception ex) @@ -666,29 +462,25 @@ namespace IceInternal } } - protected IceInternal.Instance instance_; + protected Instance instance_; protected Ice.Instrumentation.InvocationObserver observer_; protected Ice.Connection cachedConnection_; - protected bool sentSynchronously_; private readonly Ice.Communicator _communicator; private readonly string _operation; private readonly object _cookie; - private Ice.Exception _exception; - private EventWaitHandle _waitHandle; - - private CancellationHandler _cancellationHandler; - private Ice.LocalException _cancellationException; + protected Ice.Exception exception_; + protected EventWaitHandle waitHandle_; - private Ice.AsyncCallback _completedCallback; - private Ice.AsyncCallback _sentCallback; + protected Ice.AsyncCallback completedCallback_; + protected Ice.AsyncCallback sentCallback_; protected Ice.ExceptionCallback exceptionCallback_; protected const int StateOK = 0x1; protected const int StateDone = 0x2; protected const int StateSent = 0x4; protected const int StateEndCalled = 0x8; - protected const int StateCachedBuffers = 0x10; protected int state_; + protected OutgoingAsyncBase outgoing_; } } |