diff options
Diffstat (limited to 'csharp/src/Ice/OutgoingAsync.cs')
-rw-r--r-- | csharp/src/Ice/OutgoingAsync.cs | 1576 |
1 files changed, 973 insertions, 603 deletions
diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs index 0f376ede850..d9160e9239a 100644 --- a/csharp/src/Ice/OutgoingAsync.cs +++ b/csharp/src/Ice/OutgoingAsync.cs @@ -7,29 +7,180 @@ // // ********************************************************************** +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + namespace IceInternal { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; + public interface OutgoingAsyncCompletionCallback + { + void init(OutgoingAsyncBase og); + + bool handleSent(bool done, bool alreadySent); + bool handleException(Ice.Exception ex); + bool handleResponse(bool ok, OutgoingAsyncBase og); - public class OutgoingAsyncBase : AsyncResultI + void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og); + void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og); + void handleInvokeResponse(bool ok, OutgoingAsyncBase og); + } + + public abstract class OutgoingAsyncBase { - public virtual Ice.AsyncCallback sent() + public virtual bool sent() + { + return sentImpl(true); + } + + public virtual bool exception(Ice.Exception ex) + { + return exceptionImpl(ex); + } + + public virtual bool response() { - return sent(true); + Debug.Assert(false); // Must be overriden by request that can handle responses + return false; } - public virtual Ice.AsyncCallback completed(Ice.Exception ex) + public void invokeSentAsync() { - return finished(ex); + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + instance_.clientThreadPool().dispatch(this.invokeSent, cachedConnection_); + } + catch(Ice.CommunicatorDestroyedException) + { + } + } + + public void invokeExceptionAsync() + { + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + instance_.clientThreadPool().dispatch(this.invokeException, cachedConnection_); + } + + public void invokeResponseAsync() + { + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + instance_.clientThreadPool().dispatch(this.invokeResponse, cachedConnection_); + } + + public void invokeSent() + { + try + { + _completionCallback.handleInvokeSent(sentSynchronously_, this); + } + catch(System.Exception ex) + { + warning(ex); + } + + if(observer_ != null && _doneInSent) + { + observer_.detach(); + observer_ = null; + } + } + public void invokeException() + { + try + { + try + { + throw _ex; + } + catch(Ice.Exception ex) + { + _completionCallback.handleInvokeException(ex, this); + } + } + catch(System.Exception ex) + { + warning(ex); + } + + if(observer_ != null) + { + observer_.detach(); + observer_ = null; + } } - public virtual Ice.AsyncCallback completed() + public void invokeResponse() { - Debug.Assert(false); // Must be implemented by classes that handle responses - return null; + if(_ex != null) + { + invokeException(); + return; + } + + try + { + try + { + _completionCallback.handleInvokeResponse((state_ & StateOK) != 0, this); + } + catch(Ice.Exception ex) + { + if(_completionCallback.handleException(ex)) + { + _completionCallback.handleInvokeException(ex, this); + } + } + catch(System.AggregateException ex) + { + throw ex.InnerException; + } + } + catch(System.Exception ex) + { + warning(ex); + } + + if(observer_ != null) + { + observer_.detach(); + observer_ = null; + } + } + + public virtual void cancelable(IceInternal.CancellationHandler handler) + { + lock(this) + { + if(_cancellationException != null) + { + try + { + throw _cancellationException; + } + catch(Ice.LocalException) + { + _cancellationException = null; + throw; + } + } + _cancellationHandler = handler; + } + } + public void cancel() + { + cancel(new Ice.InvocationCanceledException()); } public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId) @@ -63,50 +214,192 @@ namespace IceInternal return os_; } - public virtual Ice.InputStream getIs() + public Ice.InputStream getIs() { - return null; // Must be implemented by classes that handle responses + return is_; } - protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, string op, object cookie) : - base(com, instance, op, cookie) + public virtual void cacheMessageBuffers() { - os_ = new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding); } - protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, string op, object cookie, - Ice.OutputStream os) : - base(com, instance, op, cookie) + public virtual void throwUserException() { - os_ = os; } - protected new Ice.AsyncCallback sent(bool done) + protected OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback, + Ice.OutputStream os = null, Ice.InputStream iss = null) { - if(done) + instance_ = instance; + sentSynchronously_ = false; + _doneInSent = false; + state_ = 0; + os_ = os ?? new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding); + is_ = iss ?? new Ice.InputStream(instance, Ice.Util.currentProtocolEncoding); + _completionCallback = completionCallback; + if(_completionCallback != null) + { + _completionCallback.init(this); + } + } + + protected virtual bool sentImpl(bool done) + { + lock(this) { + bool alreadySent = (state_ & StateSent) > 0; + state_ |= StateSent; + if(done) + { + _doneInSent = true; + if(childObserver_ != null) + { + childObserver_.detach(); + childObserver_ = null; + } + _cancellationHandler = null; + + // + // For oneway requests after the data has been sent + // the buffers can be reused unless this is a + // collocated invocation. For collocated invocations + // the buffer won't be reused because it has already + // been marked as cached in invokeCollocated. + // + cacheMessageBuffers(); + } + + bool invoke = _completionCallback.handleSent(done, alreadySent); + if(!invoke && _doneInSent && observer_ != null) + { + observer_.detach(); + observer_ = null; + } + return invoke; + } + } + + protected virtual bool exceptionImpl(Ice.Exception ex) + { + lock(this) + { + _ex = ex; if(childObserver_ != null) { + childObserver_.failed(ex.ice_id()); childObserver_.detach(); childObserver_ = null; } + _cancellationHandler = null; + + if(observer_ != null) + { + observer_.failed(ex.ice_id()); + } + bool invoke = _completionCallback.handleException(ex); + if(!invoke && observer_ != null) + { + observer_.detach(); + observer_ = null; + } + return invoke; } - return base.sent(done); } + protected virtual bool responseImpl(bool ok) + { + lock(this) + { + if(ok) + { + state_ |= StateOK; + } + + _cancellationHandler = null; - protected new Ice.AsyncCallback finished(Ice.Exception ex) + bool invoke; + try + { + invoke = _completionCallback.handleResponse(ok, this); + } + catch(Ice.Exception ex) + { + _ex = ex; + invoke = _completionCallback.handleException(ex); + } + if(!invoke && observer_ != null) + { + observer_.detach(); + observer_ = null; + } + return invoke; + } + } + + protected void cancel(Ice.LocalException ex) { - if(childObserver_ != null) + CancellationHandler handler; { - childObserver_.failed(ex.ice_id()); - childObserver_.detach(); - childObserver_ = null; + lock(this) + { + _cancellationException = ex; + if(_cancellationHandler == null) + { + return; + } + handler = _cancellationHandler; + } } - return base.finished(ex); + handler.asyncRequestCanceled(this, ex); } - protected Ice.OutputStream os_; + void warning(System.Exception ex) + { + if(instance_.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + instance_.initializationData().logger.warning("exception raised by AMI callback:\n" + ex); + } + } + + // + // This virtual method is necessary for the communicator flush + // batch requests implementation. + // + virtual protected Ice.Instrumentation.InvocationObserver getObserver() + { + return observer_; + } + + public bool sentSynchronously() + { + return sentSynchronously_; + } + + protected Instance instance_; + protected Ice.Connection cachedConnection_; + protected bool sentSynchronously_; + protected int state_; + + protected Ice.Instrumentation.InvocationObserver observer_; protected Ice.Instrumentation.ChildInvocationObserver childObserver_; + + protected Ice.OutputStream os_; + protected Ice.InputStream is_; + + private bool _doneInSent; + private Ice.Exception _ex; + private Ice.LocalException _cancellationException; + private CancellationHandler _cancellationHandler; + private OutgoingAsyncCompletionCallback _completionCallback; + + protected const int StateOK = 0x1; + protected const int StateDone = 0x2; + protected const int StateSent = 0x4; + protected const int StateEndCalled = 0x8; + protected const int StateCachedBuffers = 0x10; + + public const int AsyncStatusQueued = 0; + public const int AsyncStatusSent = 1; + public const int AsyncStatusInvokeSentCallback = 2; } // @@ -117,21 +410,10 @@ namespace IceInternal // public abstract class ProxyOutgoingAsyncBase : OutgoingAsyncBase, TimerTask { - public static ProxyOutgoingAsyncBase check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) - { - return ProxyOutgoingAsyncBase.check<ProxyOutgoingAsyncBase>(r, prx, operation); - } - - public abstract bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback cb); - - public abstract bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback cb); + public abstract int invokeRemote(Ice.ConnectionI connection, bool compress, bool response); + public abstract int invokeCollocated(CollocatedRequestHandler handler); - public override Ice.ObjectPrx getProxy() - { - return proxy_; - } - - public override Ice.AsyncCallback completed(Ice.Exception exc) + public override bool exception(Ice.Exception exc) { if(childObserver_ != null) { @@ -157,13 +439,26 @@ namespace IceInternal // the retry interval is 0. This method can be called with the // connection locked so we can't just retry here. // - instance_.retryQueue().add(this, handleException(exc)); - return null; + instance_.retryQueue().add(this, proxy_.handleException__(exc, handler_, mode_, _sent, ref _cnt)); + return false; } catch(Ice.Exception ex) { - return finished(ex); // No retries, we're done + return exceptionImpl(ex); // No retries, we're done + } + } + + public override void cancelable(CancellationHandler handler) + { + if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null) + { + int timeout = cachedConnection_.timeout(); + if(timeout > 0) + { + instance_.timer().schedule(this, timeout); + } } + base.cancelable(handler); } public void retryException(Ice.Exception ex) @@ -181,44 +476,28 @@ namespace IceInternal } catch(Ice.Exception exc) { - Ice.AsyncCallback cb = completed(exc); - if(cb != null) + if(exception(exc)) { - invokeCompletedAsync(cb); + invokeExceptionAsync(); } } } - public override void cancelable(CancellationHandler handler) - { - if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null) - { - int timeout = cachedConnection_.timeout(); - if(timeout > 0) - { - instance_.timer().schedule(this, timeout); - } - } - base.cancelable(handler); - } - public void retry() { invokeImpl(false); } - - public virtual void abort(Ice.Exception ex) + public void abort(Ice.Exception ex) { Debug.Assert(childObserver_ == null); - Ice.AsyncCallback cb = finished(ex); - if(cb != null) + if(exceptionImpl(ex)) { - invokeCompletedAsync(cb); + invokeExceptionAsync(); } else if(ex is Ice.CommunicatorDestroyedException) { // - // If it's a communicator destroyed exception, don't swallow + // If it's a communicator destroyed exception, swallow // it but instead notify the user thread. Even if no callback // was provided. // @@ -226,29 +505,11 @@ namespace IceInternal } } - public void runTimerTask() - { - if(proxy_.reference__().getInvocationTimeout() == -2) - { - cancel(new Ice.ConnectionTimeoutException()); - } - else - { - cancel(new Ice.InvocationTimeoutException()); - } - } - - protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie) : - base(prx.ice_getCommunicator(), prx.reference__().getInstance(), op, cookie) - { - proxy_ = prx; - mode_ = Ice.OperationMode.Normal; - _cnt = 0; - _sent = false; - } - - protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie, Ice.OutputStream os) : - base(prx.ice_getCommunicator(), prx.reference__().getInstance(), op, cookie, os) + protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, + OutgoingAsyncCompletionCallback completionCallback, + Ice.OutputStream os = null, + Ice.InputStream iss = null) : + base(prx.reference__().getInstance(), completionCallback, os, iss) { proxy_ = prx; mode_ = Ice.OperationMode.Normal; @@ -256,17 +517,6 @@ namespace IceInternal _sent = false; } - protected static T check<T>(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) - { - if(r != null && r.getProxy() != prx) - { - throw new System.ArgumentException("Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); - } - return check<T>(r, operation); - } - protected void invokeImpl(bool userThread) { try @@ -279,12 +529,9 @@ namespace IceInternal instance_.timer().schedule(this, invocationTimeout); } } - else // If not called from the user thread, it's called from the retry queue + else if(observer_ != null) { - if(observer_ != null) - { - observer_.retried(); - } + observer_.retried(); } while(true) @@ -293,22 +540,22 @@ namespace IceInternal { _sent = false; handler_ = proxy_.getRequestHandler__(); - Ice.AsyncCallback sentCallback; - if(handler_.sendAsyncRequest(this, out sentCallback)) + int status = handler_.sendAsyncRequest(this); + if((status & AsyncStatusSent) != 0) { if(userThread) { sentSynchronously_ = true; - if(sentCallback != null) + if((status & AsyncStatusInvokeSentCallback) != 0) { - invokeSent(sentCallback); // Call from the user thread. + invokeSent(); // Call the sent callback from the user thread. } } else { - if(sentCallback != null) + if((status & AsyncStatusInvokeSentCallback) != 0) { - invokeSentAsync(sentCallback); // Call from a client thread pool thread. + invokeSentAsync(); // Call the sent callback from a client thread pool thread. } } } @@ -326,7 +573,7 @@ namespace IceInternal childObserver_.detach(); childObserver_ = null; } - int interval = handleException(ex); + int interval = proxy_.handleException__(ex, handler_, mode_, _sent, ref _cnt); if(interval > 0) { instance_.retryQueue().add(this, interval); @@ -349,15 +596,13 @@ namespace IceInternal { throw; } - Ice.AsyncCallback cb = finished(ex); // No retries, we're done - if(cb != null) + else if(exceptionImpl(ex)) // No retries, we're done { - invokeCompletedAsync(cb); + invokeExceptionAsync(); } } } - - protected new Ice.AsyncCallback sent(bool done) + protected override bool sentImpl(bool done) { _sent = true; if(done) @@ -367,33 +612,39 @@ namespace IceInternal instance_.timer().cancel(this); } } - return base.sent(done); + return base.sentImpl(done); } - - protected new Ice.AsyncCallback finished(Ice.Exception ex) + protected override bool exceptionImpl(Ice.Exception ex) { if(proxy_.reference__().getInvocationTimeout() != -1) { instance_.timer().cancel(this); } - return base.finished(ex); + return base.exceptionImpl(ex); } - protected new Ice.AsyncCallback finished(bool ok) + protected override bool responseImpl(bool ok) { if(proxy_.reference__().getInvocationTimeout() != -1) { instance_.timer().cancel(this); } - return base.finished(ok); + return base.responseImpl(ok); } - protected virtual int handleException(Ice.Exception exc) + public void runTimerTask() { - return proxy_.handleException__(exc, handler_, mode_, _sent, ref _cnt); + if(proxy_.reference__().getInvocationTimeout() == -2) + { + cancel(new Ice.ConnectionTimeoutException()); + } + else + { + cancel(new Ice.InvocationTimeoutException()); + } } - protected Ice.ObjectPrxHelperBase proxy_; + protected readonly Ice.ObjectPrxHelperBase proxy_; protected RequestHandler handler_; protected Ice.OperationMode mode_; @@ -401,41 +652,28 @@ namespace IceInternal private bool _sent; } + // + // Class for handling Slice operation invocations + // public class OutgoingAsync : ProxyOutgoingAsyncBase { - public new static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) + public OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback, + Ice.OutputStream os = null, Ice.InputStream iss = null) : + base(prx, completionCallback, os, iss) { - return ProxyOutgoingAsyncBase.check<OutgoingAsync>(r, prx, operation); + encoding_ = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding()); + synchronous_ = false; } - public OutgoingAsync(Ice.ObjectPrx prx, string operation, object cookie) : - base((Ice.ObjectPrxHelperBase)prx, operation, cookie) - { - _encoding = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding()); - _is = null; - } - - public OutgoingAsync(Ice.ObjectPrx prx, string operation, object cookie, Ice.InputStream istr, - Ice.OutputStream ostr) : - base((Ice.ObjectPrxHelperBase)prx, operation, cookie, ostr) - { - _encoding = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding()); - _is = istr; - } - - public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> ctx, - bool explicitCtx, bool synchronous) + public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context, + bool synchronous) { Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol())); mode_ = mode; - _synchronous = synchronous; + synchronous_ = synchronous; - if(explicitCtx && ctx == null) - { - ctx = _emptyContext; - } - observer_ = ObserverHelper.get(proxy_, operation, ctx); + observer_ = ObserverHelper.get(proxy_, operation, context); switch(proxy_.reference__().getMode()) { @@ -477,12 +715,12 @@ namespace IceInternal os_.writeByte((byte)mode); - if(ctx != null) + if(context != null) { // // Explicit context // - Ice.ContextHelper.write(os_, ctx); + Ice.ContextHelper.write(os_, context); } else { @@ -502,74 +740,23 @@ namespace IceInternal } } } - - public override Ice.AsyncCallback sent() + public override bool sent() { - return sent(!proxy_.ice_isTwoway()); // done = true if not a two-way proxy (no response expected) - } - - public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB) - { - cachedConnection_ = con; - return con.sendAsyncRequest(this, compress, resp, 0, out sentCB); - } - - public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB) - { - // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set. - if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1) - { - // Disable caching by marking the streams as cached! - state_ |= StateCachedBuffers; - } - return handler.invokeAsyncRequest(this, 0, _synchronous, out sentCB); + return base.sentImpl(!proxy_.ice_isTwoway()); // done = true if it's not a two-way proxy } - public override void abort(Ice.Exception ex) + public override bool response() { - Reference.Mode mode = proxy_.reference__().getMode(); - if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) - { - proxy_.getBatchRequestQueue__().abortBatchRequest(os_); - } - - base.abort(ex); - } - - public void invoke() - { - Reference.Mode mode = proxy_.reference__().getMode(); - if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) - { - sentSynchronously_ = true; - proxy_.getBatchRequestQueue__().finishBatchRequest(os_, proxy_, getOperation()); - finished(true); - return; // Don't call sent/completed callback for batch AMI requests - } - - // - // NOTE: invokeImpl doesn't throw so this can be called from the - // try block with the catch block calling abort() in case of an - // exception. - // - invokeImpl(true); // userThread = true - } - - override public Ice.AsyncCallback completed() - { - Debug.Assert(_is != null); // _is has been initialized prior to this call - // // NOTE: this method is called from ConnectionI.parseMessage // with the connection locked. Therefore, it must not invoke // any user callbacks. // - Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways. if(childObserver_ != null) { - childObserver_.reply(_is.size() - Protocol.headerSize - 4); + childObserver_.reply(is_.size() - Protocol.headerSize - 4); childObserver_.detach(); childObserver_ = null; } @@ -577,208 +764,229 @@ namespace IceInternal byte replyStatus; try { - replyStatus = _is.readByte(); + replyStatus = is_.readByte(); switch(replyStatus) { - case ReplyStatus.replyOK: - { - break; - } - - case ReplyStatus.replyUserException: - { - if(observer_ != null) + case ReplyStatus.replyOK: { - observer_.userException(); + break; } - break; - } - - case ReplyStatus.replyObjectNotExist: - case ReplyStatus.replyFacetNotExist: - case ReplyStatus.replyOperationNotExist: - { - Ice.Identity id = new Ice.Identity(); - id.read__(_is); - - // - // For compatibility with the old FacetPath. - // - string[] facetPath = _is.readStringSeq(); - string facet; - if(facetPath.Length > 0) + case ReplyStatus.replyUserException: { - if(facetPath.Length > 1) + if(observer_ != null) { - throw new Ice.MarshalException(); + observer_.userException(); } - facet = facetPath[0]; - } - else - { - facet = ""; - } - - string operation = _is.readString(); - - Ice.RequestFailedException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); break; } + case ReplyStatus.replyObjectNotExist: case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - case ReplyStatus.replyOperationNotExist: { - ex = new Ice.OperationNotExistException(); - break; - } + Ice.Identity ident = new Ice.Identity(); + ident.read__(is_); + + // + // For compatibility with the old FacetPath. + // + string[] facetPath = is_.readStringSeq(); + ; + string facet; + if(facetPath.Length > 0) + { + if(facetPath.Length > 1) + { + throw new Ice.MarshalException(); + } + facet = facetPath[0]; + } + else + { + facet = ""; + } - default: - { - Debug.Assert(false); - break; - } - } + string operation = is_.readString(); - ex.id = id; - ex.facet = facet; - ex.operation = operation; - throw ex; - } + Ice.RequestFailedException ex = null; + switch(replyStatus) + { + case ReplyStatus.replyObjectNotExist: + { + ex = new Ice.ObjectNotExistException(); + break; + } - case ReplyStatus.replyUnknownException: - case ReplyStatus.replyUnknownLocalException: - case ReplyStatus.replyUnknownUserException: - { - string unknown = _is.readString(); + case ReplyStatus.replyFacetNotExist: + { + ex = new Ice.FacetNotExistException(); + break; + } - Ice.UnknownException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } + case ReplyStatus.replyOperationNotExist: + { + ex = new Ice.OperationNotExistException(); + break; + } - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; + default: + { + Debug.Assert(false); + break; + } + } + + ex.id = ident; + ex.facet = facet; + ex.operation = operation; + throw ex; } + case ReplyStatus.replyUnknownException: + case ReplyStatus.replyUnknownLocalException: case ReplyStatus.replyUnknownUserException: { - ex = new Ice.UnknownUserException(); - break; + string unknown = is_.readString(); + + Ice.UnknownException ex = null; + switch(replyStatus) + { + case ReplyStatus.replyUnknownException: + { + ex = new Ice.UnknownException(); + break; + } + + case ReplyStatus.replyUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } + + case ReplyStatus.replyUnknownUserException: + { + ex = new Ice.UnknownUserException(); + break; + } + + default: + { + Debug.Assert(false); + break; + } + } + + ex.unknown = unknown; + throw ex; } default: { - Debug.Assert(false); - break; - } + throw new Ice.UnknownReplyStatusException(); } - - ex.unknown = unknown; - throw ex; } - default: - { - throw new Ice.UnknownReplyStatusException(); - } - } - - return finished(replyStatus == ReplyStatus.replyOK); + return responseImpl(replyStatus == ReplyStatus.replyOK); } catch(Ice.Exception ex) { - return completed(ex); + return exception(ex); } } - public Ice.OutputStream startWriteParams(Ice.FormatType format) - { - os_.startEncapsulation(_encoding, format); - return os_; - } - - public void endWriteParams() + public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) { - os_.endEncapsulation(); + cachedConnection_ = connection; + return connection.sendAsyncRequest(this, compress, response, 0); } - public void writeEmptyParams() + public override int invokeCollocated(CollocatedRequestHandler handler) { - os_.writeEmptyEncapsulation(_encoding); - } - - public void writeParamEncaps(byte[] encaps) - { - if(encaps == null || encaps.Length == 0) - { - os_.writeEmptyEncapsulation(_encoding); - } - else + // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set. + if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1) { - os_.writeEncapsulation(encaps); + // Disable caching by marking the streams as cached! + state_ |= StateCachedBuffers; } + return handler.invokeAsyncRequest(this, 0, synchronous_); } - public Ice.InputStream startReadParams() + public new void abort(Ice.Exception ex) { - _is.startEncapsulation(); - return _is; - } + Reference.Mode mode = proxy_.reference__().getMode(); + if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) + { + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + proxy_.getBatchRequestQueue__().abortBatchRequest(os_); + } - public void endReadParams() - { - _is.endEncapsulation(); + base.abort(ex); } - public void readEmptyParams() + public void invoke(string operation) { - _is.skipEmptyEncapsulation(); - } + Reference.Mode mode = proxy_.reference__().getMode(); + if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) + { + sentSynchronously_ = true; + proxy_.getBatchRequestQueue__().finishBatchRequest(os_, proxy_, operation); + responseImpl(true); + return; // Don't call sent/completed callback for batch AMI requests + } - public byte[] readParamEncaps() - { - Ice.EncodingVersion encoding; - return _is.readEncapsulation(out encoding); + // + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. + // + invokeImpl(true); // userThread = true } - override public Ice.InputStream getIs() + public void invoke(string operation, + Ice.OperationMode mode, + Ice.FormatType format, + Dictionary<string, string> context, + bool synchronous, + System.Action<Ice.OutputStream> write) { - // _is can already be initialized if the invocation is retried - if(_is == null) + try + { + prepare(operation, mode, context, synchronous); + if(write != null) + { + os_.startEncapsulation(encoding_, format); + write(os_); + os_.endEncapsulation(); + } + else + { + os_.writeEmptyEncapsulation(encoding_); + } + invoke(operation); + } + catch(Ice.Exception ex) { - _is = new Ice.InputStream(instance_, Ice.Util.currentProtocolEncoding); + abort(ex); } - return _is; } - public void throwUserException() + public override void throwUserException() { try { - _is.startEncapsulation(); - _is.throwException(null); + is_.startEncapsulation(); + is_.throwException(); } - catch(Ice.UserException) + catch(Ice.UserException ex) { - _is.endEncapsulation(); - throw; + is_.endEncapsulation(); + userException_?.Invoke(ex); + throw new Ice.UnknownUserException(ex.ice_id()); } } @@ -795,140 +1003,274 @@ namespace IceInternal state_ |= StateCachedBuffers; } - if(_is != null) + if(is_ != null) { - _is.reset(); + is_.reset(); } os_.reset(); - proxy_.cacheMessageBuffers(_is, os_); + proxy_.cacheMessageBuffers(is_, os_); - _is = null; + is_ = null; os_ = null; } } - private Ice.EncodingVersion _encoding; - private Ice.InputStream _is; - - // - // If true this AMI request is being used for a generated synchronous invocation. - // - private bool _synchronous; - - private static Dictionary<string, string> _emptyContext = new Dictionary<string, string>(); + protected readonly Ice.EncodingVersion encoding_; + protected System.Action<Ice.UserException> userException_; + protected bool synchronous_; } - public class CommunicatorFlushBatch : IceInternal.AsyncResultI + public class OutgoingAsyncT<T> : OutgoingAsync { - public static CommunicatorFlushBatch check(Ice.AsyncResult r, Ice.Communicator com, string operation) + public OutgoingAsyncT(Ice.ObjectPrxHelperBase prx, + OutgoingAsyncCompletionCallback completionCallback, + Ice.OutputStream os = null, + Ice.InputStream iss = null) : + base(prx, completionCallback, os, iss) { - if(r != null && r.getCommunicator() != com) - { - throw new System.ArgumentException("Communicator for call to end_" + operation + - " does not match communicator that was used to call " + - "corresponding begin_" + operation + " method"); - } - return AsyncResultI.check<CommunicatorFlushBatch>(r, operation); } - public CommunicatorFlushBatch(Ice.Communicator communicator, Instance instance, string op, object cookie) : - base(communicator, instance, op, cookie) + public void invoke(string operation, + Ice.OperationMode mode, + Ice.FormatType format, + Dictionary<string, string> context, + bool synchronous, + System.Action<Ice.OutputStream> write = null, + System.Action<Ice.UserException> userException = null, + System.Func<Ice.InputStream, T> read = null) { + read_ = read; + userException_ = userException; + base.invoke(operation, mode, format, context, synchronous, write); + } - observer_ = ObserverHelper.get(instance, op); + public T result__(bool ok) + { + try + { + if(ok) + { + if(read_ == null) + { + if(is_ == null || is_.isEmpty()) + { + // + // If there's no response (oneway, batch-oneway proxies), we just set the promise + // on completion without reading anything from the input stream. This is required for + // batch invocations. + // + } + else + { + is_.skipEmptyEncapsulation(); + } + return default(T); + } + else + { + is_.startEncapsulation(); + T r = read_(is_); + is_.endEncapsulation(); + return r; + } + } + else + { + throwUserException(); + return default(T); // make compiler happy + } + } + finally + { + cacheMessageBuffers(); + } + } - // - // _useCount is initialized to 1 to prevent premature callbacks. - // The caller must invoke ready() after all flush requests have - // been initiated. - // - _useCount = 1; + protected System.Func<Ice.InputStream, T> read_; + } + + // + // Class for handling the proxy's begin_ice_flushBatchRequest request. + // + class ProxyFlushBatchAsync : ProxyOutgoingAsyncBase + { + public ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) : + base(prx, completionCallback) + { } - public void flushConnection(Ice.ConnectionI con) + public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) { - lock(this) + if(_batchRequestNum == 0) { - ++_useCount; + if(sent()) + { + return AsyncStatusSent | AsyncStatusInvokeSentCallback; + } + else + { + return AsyncStatusSent; + } } + cachedConnection_ = connection; + return connection.sendAsyncRequest(this, compress, false, _batchRequestNum); + } - try + public override int invokeCollocated(CollocatedRequestHandler handler) + { + if(_batchRequestNum == 0) { - Ice.AsyncCallback sentCB = null; - FlushBatch flush = new FlushBatch(this); - int batchRequestNum = con.getBatchRequestQueue().swap(flush.getOs()); - if(batchRequestNum == 0) + if(sent()) { - flush.sent(); + return AsyncStatusSent | AsyncStatusInvokeSentCallback; } else { - con.sendAsyncRequest(flush, false, false, batchRequestNum, out sentCB); + return AsyncStatusSent; } - Debug.Assert(sentCB == null); } - catch(Ice.LocalException) + return handler.invokeAsyncRequest(this, _batchRequestNum, false); + } + + public void invoke(string operation) + { + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol())); + observer_ = ObserverHelper.get(proxy_, operation, null); + _batchRequestNum = proxy_.getBatchRequestQueue__().swap(os_); + invokeImpl(true); // userThread = true + } + + private int _batchRequestNum; + } + + // + // Class for handling the proxy's begin_ice_getConnection request. + // + class ProxyGetConnection : ProxyOutgoingAsyncBase + { + public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) : + base(prx, completionCallback) + { + } + + public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response) + { + cachedConnection_ = connection; + if(responseImpl(true)) { - doCheck(false); - throw; + invokeResponseAsync(); } + return AsyncStatusSent; } - public void ready() + public override int invokeCollocated(CollocatedRequestHandler handler) { - doCheck(true); + if(responseImpl(true)) + { + invokeResponseAsync(); + } + return AsyncStatusSent; } - private void doCheck(bool userThread) + public void invoke(string operation) { - lock(this) + observer_ = ObserverHelper.get(proxy_, operation, null); + invokeImpl(true); // userThread = true + } + } + + class ConnectionFlushBatchAsync : OutgoingAsyncBase + { + public ConnectionFlushBatchAsync(Ice.ConnectionI connection, + Instance instance, + OutgoingAsyncCompletionCallback completionCallback) : + base(instance, completionCallback) + { + _connection = connection; + } + + public void invoke(string operation) + { + observer_ = ObserverHelper.get(instance_, operation); + try { - Debug.Assert(_useCount > 0); - if(--_useCount > 0) + int status; + int batchRequestNum = _connection.getBatchRequestQueue().swap(os_); + if(batchRequestNum == 0) { - return; + status = AsyncStatusSent; + if(sent()) + { + status = status | AsyncStatusInvokeSentCallback; + } + } + else + { + status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); } - } - Ice.AsyncCallback sentCB = sent(true); - if(userThread) + if((status & AsyncStatusSent) != 0) + { + sentSynchronously_ = true; + if((status & AsyncStatusInvokeSentCallback) != 0) + { + invokeSent(); + } + } + } + catch(RetryException ex) { - sentSynchronously_ = true; - if(sentCB != null) + try + { + throw ex.get(); + } + catch(Ice.LocalException ee) { - invokeSent(sentCB); + if(exception(ee)) + { + invokeExceptionAsync(); + } } } - else + catch(Ice.Exception ex) { - if(sentCB != null) + if(exception(ex)) { - invokeSentAsync(sentCB); + invokeExceptionAsync(); } } } + private readonly Ice.ConnectionI _connection; + }; + + public class CommunicatorFlushBatchAsync : OutgoingAsyncBase + { class FlushBatch : OutgoingAsyncBase { - public FlushBatch(CommunicatorFlushBatch outAsync) : - base(outAsync.getCommunicator(), outAsync.instance_, outAsync.getOperation(), null) + public FlushBatch(CommunicatorFlushBatchAsync outAsync, + Instance instance, + Ice.Instrumentation.InvocationObserver observer) : base(instance, null) { _outAsync = outAsync; + _observer = observer; } - public override Ice.AsyncCallback sent() + public override bool + sent() { if(childObserver_ != null) { childObserver_.detach(); childObserver_ = null; } - _outAsync.doCheck(false); - return null; + _outAsync.check(false); + return false; } - public override Ice.AsyncCallback completed(Ice.Exception ex) + public override bool + exception(Ice.Exception ex) { if(childObserver_ != null) { @@ -936,233 +1278,289 @@ namespace IceInternal childObserver_.detach(); childObserver_ = null; } - _outAsync.doCheck(false); - return null; + _outAsync.check(false); + return false; } - protected override Ice.Instrumentation.InvocationObserver getObserver() + protected override Ice.Instrumentation.InvocationObserver + getObserver() { - return _outAsync.getObserver(); + return _observer; } - private CommunicatorFlushBatch _outAsync; - } - private int _useCount; - } - + private CommunicatorFlushBatchAsync _outAsync; + private Ice.Instrumentation.InvocationObserver _observer; + }; - public class ConnectionFlushBatch : OutgoingAsyncBase - { - public static ConnectionFlushBatch check(Ice.AsyncResult r, Ice.Connection con, string operation) + public CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback) : + base(instance, callback) { - if(r != null && r.getConnection() != con) - { - throw new System.ArgumentException("Connection for call to end_" + operation + - " does not match connection that was used to call " + - "corresponding begin_" + operation + " method"); - } - return AsyncResultI.check<ConnectionFlushBatch>(r, operation); - } - - public ConnectionFlushBatch(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, string op, - object cookie) : - base(communicator, instance, op, cookie) - { - _connection = con; + // + // _useCount is initialized to 1 to prevent premature callbacks. + // The caller must invoke ready() after all flush requests have + // been initiated. + // + _useCount = 1; } - public override Ice.Connection getConnection() + public void flushConnection(Ice.ConnectionI con) { - return _connection; - } + lock(this) + { + ++_useCount; + } - public void invoke() - { try { - int batchRequestNum = _connection.getBatchRequestQueue().swap(os_); - - bool isSent = false; - Ice.AsyncCallback sentCB; + var flushBatch = new FlushBatch(this, instance_, _observer); + int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); if(batchRequestNum == 0) { - isSent = true; - sentCB = sent(); + flushBatch.sent(); } else { - isSent = _connection.sendAsyncRequest(this, false, false, batchRequestNum, out sentCB); + con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); } + } + catch(Ice.LocalException) + { + check(false); + throw; + } + } - if(isSent) - { - sentSynchronously_ = true; - if(sentCB != null) - { - invokeSent(sentCB); - } - } + public void invoke(string operation) + { + _observer = ObserverHelper.get(instance_, operation); + if(_observer != null) + { + _observer.attach(); } - catch(RetryException ex) + instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this); + instance_.objectAdapterFactory().flushAsyncBatchRequests(this); + check(true); + } + + public void check(bool userThread) + { + lock(this) { - Ice.AsyncCallback cb = completed(ex.get()); - if(cb != null) + Debug.Assert(_useCount > 0); + if(--_useCount > 0) { - invokeCompletedAsync(cb); + return; } } - catch(Ice.Exception ex) + + if(sentImpl(true)) { - Ice.AsyncCallback cb = completed(ex); - if(cb != null) + if(userThread) { - invokeCompletedAsync(cb); + sentSynchronously_ = true; + invokeSent(); + } + else + { + invokeSentAsync(); } } } - private Ice.ConnectionI _connection; - } + private int _useCount; + private Ice.Instrumentation.InvocationObserver _observer; + }; - public class ProxyFlushBatch : ProxyOutgoingAsyncBase + public abstract class TaskCompletionCallback<T> : TaskCompletionSource<T>, OutgoingAsyncCompletionCallback { - public new static ProxyFlushBatch check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) + public TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) { - return ProxyOutgoingAsyncBase.check<ProxyFlushBatch>(r, prx, operation); + _progress = progress; + _cancellationToken = cancellationToken; } - public ProxyFlushBatch(Ice.ObjectPrxHelperBase prx, string operation, object cookie) : - base(prx, operation, cookie) + public void init(OutgoingAsyncBase outgoing) { - observer_ = ObserverHelper.get(prx, operation); - _batchRequestNum = prx.getBatchRequestQueue__().swap(os_); + if(_cancellationToken.CanBeCanceled) + { + _cancellationToken.Register(outgoing.cancel); + } } - public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB) + public virtual bool handleSent(bool done, bool alreadySent) { - if(_batchRequestNum == 0) + if(done) { - sentCB = sent(); - return true; + SetResult(default(T)); } - cachedConnection_ = con; - return con.sendAsyncRequest(this, compress, false, _batchRequestNum, out sentCB); + return _progress != null && !alreadySent; // Invoke the sent callback only if not already invoked. } - public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB) + public bool handleException(Ice.Exception ex) { - if(_batchRequestNum == 0) - { - sentCB = sent(); - return true; - } - return handler.invokeAsyncRequest(this, _batchRequestNum, false, out sentCB); + SetException(ex); + return false; } - public void invoke() + public abstract bool handleResponse(bool ok, OutgoingAsyncBase og); + + public void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og) { - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol())); - invokeImpl(true); // userThread = true + _progress.Report(sentSynchronously); } - private int _batchRequestNum; + public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og) + { + Debug.Assert(false); + } + + public void handleInvokeResponse(bool ok, OutgoingAsyncBase og) + { + Debug.Assert(false); + } + + private readonly CancellationToken _cancellationToken; + private readonly System.IProgress<bool> _progress; } - public class ProxyGetConnection : ProxyOutgoingAsyncBase, Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> + public class OperationTaskCompletionCallback<T> : TaskCompletionCallback<T> { - public new static ProxyGetConnection check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation) + public OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) : + base(progress, cancellationToken) { - return ProxyOutgoingAsyncBase.check<ProxyGetConnection>(r, prx, operation); } - public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, string operation, - ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb, object cookie) : - base(prx, operation, cookie) + public override bool handleResponse(bool ok, OutgoingAsyncBase og) { - observer_ = ObserverHelper.get(prx, operation); - _completed = cb; + SetResult(((OutgoingAsyncT<T>)og).result__(ok)); + return false; } + } - public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB) + public class FlushBatchTaskCompletionCallback : TaskCompletionCallback<object> + { + public FlushBatchTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) : + base(progress, cancellationToken) { - sentCB = null; - cachedConnection_ = con; - Ice.AsyncCallback cb = finished(true); - if(cb != null) - { - invokeCompletedAsync(cb); - } - return true; } - public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB) + public override bool handleResponse(bool ok, OutgoingAsyncBase og) { - sentCB = null; - Ice.AsyncCallback cb = finished(true); - if(cb != null) - { - invokeCompletedAsync(cb); - } - return true; + SetResult(null); + return false; } + } - public void invoke() + abstract public class AsyncResultCompletionCallback : AsyncResultI, OutgoingAsyncCompletionCallback + { + public AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie, + Ice.AsyncCallback cb) : + base(com, instance, op, cookie, cb) { - invokeImpl(true); // userThread = true } - new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> whenCompleted(Ice.ExceptionCallback excb) + public void init(OutgoingAsyncBase outgoing) { - base.whenCompleted(excb); - return this; + outgoing_ = outgoing; } - virtual public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> - whenCompleted(Ice.Callback_Object_ice_getConnection cb, Ice.ExceptionCallback excb) + public bool handleSent(bool done, bool alreadySent) { - if(cb == null && excb == null) + lock(this) { - throw new System.ArgumentException("callback is null"); + state_ |= StateSent; + if(done) + { + state_ |= StateDone | StateOK; + } + if(waitHandle_ != null) + { + waitHandle_.Set(); + } + Monitor.PulseAll(this); + + // + // Invoke the sent callback only if not already invoked. + // + return !alreadySent && sentCallback_ != null; } + } + + public bool handleException(Ice.Exception ex) + { lock(this) { - if(_responseCallback != null || exceptionCallback_ != null) + state_ |= StateDone; + exception_ = ex; + if(waitHandle_ != null) { - throw new System.ArgumentException("callback already set"); + waitHandle_.Set(); } - _responseCallback = cb; - exceptionCallback_ = excb; + Monitor.PulseAll(this); + return completedCallback_ != null; } - setCompletedCallback(getCompletedCallback()); - return this; } - new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> whenSent(Ice.SentCallback cb) + public bool handleResponse(bool ok, OutgoingAsyncBase og) { - base.whenSent(cb); - return this; + lock(this) + { + state_ |= StateDone; + if(ok) + { + state_ |= StateOK; + } + if(waitHandle_ != null) + { + waitHandle_.Set(); + } + Monitor.PulseAll(this); + return completedCallback_ != null; + } } - protected override Ice.AsyncCallback getCompletedCallback() + public void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og) { - return (Ice.AsyncResult result) => { _completed(this, _responseCallback, exceptionCallback_); }; + sentCallback_(this); } - private ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> _completed; - private Ice.Callback_Object_ice_getConnection _responseCallback = null; + public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og) + { + try + { + completedCallback_(this); + } + catch(Ice.Exception e) + { + throw new System.AggregateException(e); + } + } + + public void handleInvokeResponse(bool ok, OutgoingAsyncBase og) + { + try + { + completedCallback_(this); + } + catch(Ice.Exception e) + { + throw new System.AggregateException(e); + } + } } - public abstract class OutgoingAsync<T> : OutgoingAsync, Ice.AsyncResult<T> + abstract public class ProxyAsyncResultCompletionCallback<T> : AsyncResultCompletionCallback, Ice.AsyncResult<T> { - public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie) : - base(prx, operation, cookie) + public ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie, + Ice.AsyncCallback cb) : + base(proxy.ice_getCommunicator(), proxy.reference__().getInstance(), operation, cookie, cb) { + _proxy = proxy; } - public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, Ice.InputStream iss, - Ice.OutputStream os) : - base(prx, operation, cookie, iss, os) + public override Ice.ObjectPrx getProxy() { + return _proxy; } new public Ice.AsyncResult<T> whenCompleted(Ice.ExceptionCallback excb) @@ -1197,73 +1595,45 @@ namespace IceInternal } protected T responseCallback_; + private Ice.ObjectPrx _proxy; } - public class TwowayOutgoingAsync<T> : OutgoingAsync<T> - { - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, - object cookie) : - base(prx, operation, cookie) - { - Debug.Assert(cb != null); - _completed = cb; - } - - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, - object cookie, Ice.InputStream iss, Ice.OutputStream os) : - base(prx, operation, cookie, iss, os) - { - Debug.Assert(cb != null); - _completed = cb; - } - - override protected Ice.AsyncCallback getCompletedCallback() - { - return (Ice.AsyncResult result) => { _completed(this, responseCallback_, exceptionCallback_); }; - } - - private ProxyTwowayCallback<T> _completed; - } - - public class OnewayOutgoingAsync<T> : OutgoingAsync<T> + public class OperationAsyncResultCompletionCallback<T, R> : ProxyAsyncResultCompletionCallback<T> { - public OnewayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyOnewayCallback<T> cb, - object cookie) : - base(prx, operation, cookie) + public OperationAsyncResultCompletionCallback(System.Action<T, R> completed, + Ice.ObjectPrxHelperBase proxy, + string operation, + object cookie, + Ice.AsyncCallback callback) : + base(proxy, operation, cookie, callback) { - Debug.Assert(cb != null); - _completed = cb; - } - - public OnewayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyOnewayCallback<T> cb, - object cookie, Ice.InputStream iss, Ice.OutputStream os) : - base(prx, operation, cookie, iss, os) - { - Debug.Assert(cb != null); - _completed = cb; + _completed = completed; } override protected Ice.AsyncCallback getCompletedCallback() { - return (Ice.AsyncResult result) => + return (Ice.AsyncResult r) => { + Debug.Assert(r == this); try { - IceInternal.OutgoingAsync outAsync__ = (IceInternal.OutgoingAsync)result; - ((Ice.ObjectPrxHelperBase)(outAsync__.getProxy())).end__(outAsync__, outAsync__.getOperation()); - } - catch(Ice.Exception ex__) - { - if(exceptionCallback_ != null) + R result = ((OutgoingAsyncT<R>)outgoing_).result__(wait()); + try { - exceptionCallback_(ex__); + _completed(responseCallback_, result); } - return; + catch(Ice.Exception ex) + { + throw new System.AggregateException(ex); + } + } + catch(Ice.Exception ex) + { + exceptionCallback_?.Invoke(ex); } - _completed(responseCallback_); }; } - private ProxyOnewayCallback<T> _completed; + private System.Action<T, R> _completed; } } |