summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/OutgoingAsync.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/OutgoingAsync.cs')
-rw-r--r--csharp/src/Ice/OutgoingAsync.cs1576
1 files changed, 973 insertions, 603 deletions
diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs
index 0f376ede850..d9160e9239a 100644
--- a/csharp/src/Ice/OutgoingAsync.cs
+++ b/csharp/src/Ice/OutgoingAsync.cs
@@ -7,29 +7,180 @@
//
// **********************************************************************
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
namespace IceInternal
{
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Threading;
+ public interface OutgoingAsyncCompletionCallback
+ {
+ void init(OutgoingAsyncBase og);
+
+ bool handleSent(bool done, bool alreadySent);
+ bool handleException(Ice.Exception ex);
+ bool handleResponse(bool ok, OutgoingAsyncBase og);
- public class OutgoingAsyncBase : AsyncResultI
+ void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og);
+ void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og);
+ void handleInvokeResponse(bool ok, OutgoingAsyncBase og);
+ }
+
+ public abstract class OutgoingAsyncBase
{
- public virtual Ice.AsyncCallback sent()
+ public virtual bool sent()
+ {
+ return sentImpl(true);
+ }
+
+ public virtual bool exception(Ice.Exception ex)
+ {
+ return exceptionImpl(ex);
+ }
+
+ public virtual bool response()
{
- return sent(true);
+ Debug.Assert(false); // Must be overriden by request that can handle responses
+ return false;
}
- public virtual Ice.AsyncCallback completed(Ice.Exception ex)
+ public void invokeSentAsync()
{
- return finished(ex);
+ //
+ // This is called when it's not safe to call the sent callback
+ // synchronously from this thread. Instead the exception callback
+ // is called asynchronously from the client thread pool.
+ //
+ try
+ {
+ instance_.clientThreadPool().dispatch(this.invokeSent, cachedConnection_);
+ }
+ catch(Ice.CommunicatorDestroyedException)
+ {
+ }
+ }
+
+ public void invokeExceptionAsync()
+ {
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ instance_.clientThreadPool().dispatch(this.invokeException, cachedConnection_);
+ }
+
+ public void invokeResponseAsync()
+ {
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ instance_.clientThreadPool().dispatch(this.invokeResponse, cachedConnection_);
+ }
+
+ public void invokeSent()
+ {
+ try
+ {
+ _completionCallback.handleInvokeSent(sentSynchronously_, this);
+ }
+ catch(System.Exception ex)
+ {
+ warning(ex);
+ }
+
+ if(observer_ != null && _doneInSent)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ }
+ public void invokeException()
+ {
+ try
+ {
+ try
+ {
+ throw _ex;
+ }
+ catch(Ice.Exception ex)
+ {
+ _completionCallback.handleInvokeException(ex, this);
+ }
+ }
+ catch(System.Exception ex)
+ {
+ warning(ex);
+ }
+
+ if(observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
}
- public virtual Ice.AsyncCallback completed()
+ public void invokeResponse()
{
- Debug.Assert(false); // Must be implemented by classes that handle responses
- return null;
+ if(_ex != null)
+ {
+ invokeException();
+ return;
+ }
+
+ try
+ {
+ try
+ {
+ _completionCallback.handleInvokeResponse((state_ & StateOK) != 0, this);
+ }
+ catch(Ice.Exception ex)
+ {
+ if(_completionCallback.handleException(ex))
+ {
+ _completionCallback.handleInvokeException(ex, this);
+ }
+ }
+ catch(System.AggregateException ex)
+ {
+ throw ex.InnerException;
+ }
+ }
+ catch(System.Exception ex)
+ {
+ warning(ex);
+ }
+
+ if(observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ }
+
+ public virtual void cancelable(IceInternal.CancellationHandler handler)
+ {
+ lock(this)
+ {
+ if(_cancellationException != null)
+ {
+ try
+ {
+ throw _cancellationException;
+ }
+ catch(Ice.LocalException)
+ {
+ _cancellationException = null;
+ throw;
+ }
+ }
+ _cancellationHandler = handler;
+ }
+ }
+ public void cancel()
+ {
+ cancel(new Ice.InvocationCanceledException());
}
public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
@@ -63,50 +214,192 @@ namespace IceInternal
return os_;
}
- public virtual Ice.InputStream getIs()
+ public Ice.InputStream getIs()
{
- return null; // Must be implemented by classes that handle responses
+ return is_;
}
- protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, string op, object cookie) :
- base(com, instance, op, cookie)
+ public virtual void cacheMessageBuffers()
{
- os_ = new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding);
}
- protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, string op, object cookie,
- Ice.OutputStream os) :
- base(com, instance, op, cookie)
+ public virtual void throwUserException()
{
- os_ = os;
}
- protected new Ice.AsyncCallback sent(bool done)
+ protected OutgoingAsyncBase(Instance instance, OutgoingAsyncCompletionCallback completionCallback,
+ Ice.OutputStream os = null, Ice.InputStream iss = null)
{
- if(done)
+ instance_ = instance;
+ sentSynchronously_ = false;
+ _doneInSent = false;
+ state_ = 0;
+ os_ = os ?? new Ice.OutputStream(instance, Ice.Util.currentProtocolEncoding);
+ is_ = iss ?? new Ice.InputStream(instance, Ice.Util.currentProtocolEncoding);
+ _completionCallback = completionCallback;
+ if(_completionCallback != null)
+ {
+ _completionCallback.init(this);
+ }
+ }
+
+ protected virtual bool sentImpl(bool done)
+ {
+ lock(this)
{
+ bool alreadySent = (state_ & StateSent) > 0;
+ state_ |= StateSent;
+ if(done)
+ {
+ _doneInSent = true;
+ if(childObserver_ != null)
+ {
+ childObserver_.detach();
+ childObserver_ = null;
+ }
+ _cancellationHandler = null;
+
+ //
+ // For oneway requests after the data has been sent
+ // the buffers can be reused unless this is a
+ // collocated invocation. For collocated invocations
+ // the buffer won't be reused because it has already
+ // been marked as cached in invokeCollocated.
+ //
+ cacheMessageBuffers();
+ }
+
+ bool invoke = _completionCallback.handleSent(done, alreadySent);
+ if(!invoke && _doneInSent && observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ return invoke;
+ }
+ }
+
+ protected virtual bool exceptionImpl(Ice.Exception ex)
+ {
+ lock(this)
+ {
+ _ex = ex;
if(childObserver_ != null)
{
+ childObserver_.failed(ex.ice_id());
childObserver_.detach();
childObserver_ = null;
}
+ _cancellationHandler = null;
+
+ if(observer_ != null)
+ {
+ observer_.failed(ex.ice_id());
+ }
+ bool invoke = _completionCallback.handleException(ex);
+ if(!invoke && observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ return invoke;
}
- return base.sent(done);
}
+ protected virtual bool responseImpl(bool ok)
+ {
+ lock(this)
+ {
+ if(ok)
+ {
+ state_ |= StateOK;
+ }
+
+ _cancellationHandler = null;
- protected new Ice.AsyncCallback finished(Ice.Exception ex)
+ bool invoke;
+ try
+ {
+ invoke = _completionCallback.handleResponse(ok, this);
+ }
+ catch(Ice.Exception ex)
+ {
+ _ex = ex;
+ invoke = _completionCallback.handleException(ex);
+ }
+ if(!invoke && observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ return invoke;
+ }
+ }
+
+ protected void cancel(Ice.LocalException ex)
{
- if(childObserver_ != null)
+ CancellationHandler handler;
{
- childObserver_.failed(ex.ice_id());
- childObserver_.detach();
- childObserver_ = null;
+ lock(this)
+ {
+ _cancellationException = ex;
+ if(_cancellationHandler == null)
+ {
+ return;
+ }
+ handler = _cancellationHandler;
+ }
}
- return base.finished(ex);
+ handler.asyncRequestCanceled(this, ex);
}
- protected Ice.OutputStream os_;
+ void warning(System.Exception ex)
+ {
+ if(instance_.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ instance_.initializationData().logger.warning("exception raised by AMI callback:\n" + ex);
+ }
+ }
+
+ //
+ // This virtual method is necessary for the communicator flush
+ // batch requests implementation.
+ //
+ virtual protected Ice.Instrumentation.InvocationObserver getObserver()
+ {
+ return observer_;
+ }
+
+ public bool sentSynchronously()
+ {
+ return sentSynchronously_;
+ }
+
+ protected Instance instance_;
+ protected Ice.Connection cachedConnection_;
+ protected bool sentSynchronously_;
+ protected int state_;
+
+ protected Ice.Instrumentation.InvocationObserver observer_;
protected Ice.Instrumentation.ChildInvocationObserver childObserver_;
+
+ protected Ice.OutputStream os_;
+ protected Ice.InputStream is_;
+
+ private bool _doneInSent;
+ private Ice.Exception _ex;
+ private Ice.LocalException _cancellationException;
+ private CancellationHandler _cancellationHandler;
+ private OutgoingAsyncCompletionCallback _completionCallback;
+
+ protected const int StateOK = 0x1;
+ protected const int StateDone = 0x2;
+ protected const int StateSent = 0x4;
+ protected const int StateEndCalled = 0x8;
+ protected const int StateCachedBuffers = 0x10;
+
+ public const int AsyncStatusQueued = 0;
+ public const int AsyncStatusSent = 1;
+ public const int AsyncStatusInvokeSentCallback = 2;
}
//
@@ -117,21 +410,10 @@ namespace IceInternal
//
public abstract class ProxyOutgoingAsyncBase : OutgoingAsyncBase, TimerTask
{
- public static ProxyOutgoingAsyncBase check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
- {
- return ProxyOutgoingAsyncBase.check<ProxyOutgoingAsyncBase>(r, prx, operation);
- }
-
- public abstract bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback cb);
-
- public abstract bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback cb);
+ public abstract int invokeRemote(Ice.ConnectionI connection, bool compress, bool response);
+ public abstract int invokeCollocated(CollocatedRequestHandler handler);
- public override Ice.ObjectPrx getProxy()
- {
- return proxy_;
- }
-
- public override Ice.AsyncCallback completed(Ice.Exception exc)
+ public override bool exception(Ice.Exception exc)
{
if(childObserver_ != null)
{
@@ -157,13 +439,26 @@ namespace IceInternal
// the retry interval is 0. This method can be called with the
// connection locked so we can't just retry here.
//
- instance_.retryQueue().add(this, handleException(exc));
- return null;
+ instance_.retryQueue().add(this, proxy_.handleException__(exc, handler_, mode_, _sent, ref _cnt));
+ return false;
}
catch(Ice.Exception ex)
{
- return finished(ex); // No retries, we're done
+ return exceptionImpl(ex); // No retries, we're done
+ }
+ }
+
+ public override void cancelable(CancellationHandler handler)
+ {
+ if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null)
+ {
+ int timeout = cachedConnection_.timeout();
+ if(timeout > 0)
+ {
+ instance_.timer().schedule(this, timeout);
+ }
}
+ base.cancelable(handler);
}
public void retryException(Ice.Exception ex)
@@ -181,44 +476,28 @@ namespace IceInternal
}
catch(Ice.Exception exc)
{
- Ice.AsyncCallback cb = completed(exc);
- if(cb != null)
+ if(exception(exc))
{
- invokeCompletedAsync(cb);
+ invokeExceptionAsync();
}
}
}
- public override void cancelable(CancellationHandler handler)
- {
- if(proxy_.reference__().getInvocationTimeout() == -2 && cachedConnection_ != null)
- {
- int timeout = cachedConnection_.timeout();
- if(timeout > 0)
- {
- instance_.timer().schedule(this, timeout);
- }
- }
- base.cancelable(handler);
- }
-
public void retry()
{
invokeImpl(false);
}
-
- public virtual void abort(Ice.Exception ex)
+ public void abort(Ice.Exception ex)
{
Debug.Assert(childObserver_ == null);
- Ice.AsyncCallback cb = finished(ex);
- if(cb != null)
+ if(exceptionImpl(ex))
{
- invokeCompletedAsync(cb);
+ invokeExceptionAsync();
}
else if(ex is Ice.CommunicatorDestroyedException)
{
//
- // If it's a communicator destroyed exception, don't swallow
+ // If it's a communicator destroyed exception, swallow
// it but instead notify the user thread. Even if no callback
// was provided.
//
@@ -226,29 +505,11 @@ namespace IceInternal
}
}
- public void runTimerTask()
- {
- if(proxy_.reference__().getInvocationTimeout() == -2)
- {
- cancel(new Ice.ConnectionTimeoutException());
- }
- else
- {
- cancel(new Ice.InvocationTimeoutException());
- }
- }
-
- protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie) :
- base(prx.ice_getCommunicator(), prx.reference__().getInstance(), op, cookie)
- {
- proxy_ = prx;
- mode_ = Ice.OperationMode.Normal;
- _cnt = 0;
- _sent = false;
- }
-
- protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, string op, object cookie, Ice.OutputStream os) :
- base(prx.ice_getCommunicator(), prx.reference__().getInstance(), op, cookie, os)
+ protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx,
+ OutgoingAsyncCompletionCallback completionCallback,
+ Ice.OutputStream os = null,
+ Ice.InputStream iss = null) :
+ base(prx.reference__().getInstance(), completionCallback, os, iss)
{
proxy_ = prx;
mode_ = Ice.OperationMode.Normal;
@@ -256,17 +517,6 @@ namespace IceInternal
_sent = false;
}
- protected static T check<T>(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
- {
- if(r != null && r.getProxy() != prx)
- {
- throw new System.ArgumentException("Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
- }
- return check<T>(r, operation);
- }
-
protected void invokeImpl(bool userThread)
{
try
@@ -279,12 +529,9 @@ namespace IceInternal
instance_.timer().schedule(this, invocationTimeout);
}
}
- else // If not called from the user thread, it's called from the retry queue
+ else if(observer_ != null)
{
- if(observer_ != null)
- {
- observer_.retried();
- }
+ observer_.retried();
}
while(true)
@@ -293,22 +540,22 @@ namespace IceInternal
{
_sent = false;
handler_ = proxy_.getRequestHandler__();
- Ice.AsyncCallback sentCallback;
- if(handler_.sendAsyncRequest(this, out sentCallback))
+ int status = handler_.sendAsyncRequest(this);
+ if((status & AsyncStatusSent) != 0)
{
if(userThread)
{
sentSynchronously_ = true;
- if(sentCallback != null)
+ if((status & AsyncStatusInvokeSentCallback) != 0)
{
- invokeSent(sentCallback); // Call from the user thread.
+ invokeSent(); // Call the sent callback from the user thread.
}
}
else
{
- if(sentCallback != null)
+ if((status & AsyncStatusInvokeSentCallback) != 0)
{
- invokeSentAsync(sentCallback); // Call from a client thread pool thread.
+ invokeSentAsync(); // Call the sent callback from a client thread pool thread.
}
}
}
@@ -326,7 +573,7 @@ namespace IceInternal
childObserver_.detach();
childObserver_ = null;
}
- int interval = handleException(ex);
+ int interval = proxy_.handleException__(ex, handler_, mode_, _sent, ref _cnt);
if(interval > 0)
{
instance_.retryQueue().add(this, interval);
@@ -349,15 +596,13 @@ namespace IceInternal
{
throw;
}
- Ice.AsyncCallback cb = finished(ex); // No retries, we're done
- if(cb != null)
+ else if(exceptionImpl(ex)) // No retries, we're done
{
- invokeCompletedAsync(cb);
+ invokeExceptionAsync();
}
}
}
-
- protected new Ice.AsyncCallback sent(bool done)
+ protected override bool sentImpl(bool done)
{
_sent = true;
if(done)
@@ -367,33 +612,39 @@ namespace IceInternal
instance_.timer().cancel(this);
}
}
- return base.sent(done);
+ return base.sentImpl(done);
}
-
- protected new Ice.AsyncCallback finished(Ice.Exception ex)
+ protected override bool exceptionImpl(Ice.Exception ex)
{
if(proxy_.reference__().getInvocationTimeout() != -1)
{
instance_.timer().cancel(this);
}
- return base.finished(ex);
+ return base.exceptionImpl(ex);
}
- protected new Ice.AsyncCallback finished(bool ok)
+ protected override bool responseImpl(bool ok)
{
if(proxy_.reference__().getInvocationTimeout() != -1)
{
instance_.timer().cancel(this);
}
- return base.finished(ok);
+ return base.responseImpl(ok);
}
- protected virtual int handleException(Ice.Exception exc)
+ public void runTimerTask()
{
- return proxy_.handleException__(exc, handler_, mode_, _sent, ref _cnt);
+ if(proxy_.reference__().getInvocationTimeout() == -2)
+ {
+ cancel(new Ice.ConnectionTimeoutException());
+ }
+ else
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
}
- protected Ice.ObjectPrxHelperBase proxy_;
+ protected readonly Ice.ObjectPrxHelperBase proxy_;
protected RequestHandler handler_;
protected Ice.OperationMode mode_;
@@ -401,41 +652,28 @@ namespace IceInternal
private bool _sent;
}
+ //
+ // Class for handling Slice operation invocations
+ //
public class OutgoingAsync : ProxyOutgoingAsyncBase
{
- public new static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
+ public OutgoingAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback,
+ Ice.OutputStream os = null, Ice.InputStream iss = null) :
+ base(prx, completionCallback, os, iss)
{
- return ProxyOutgoingAsyncBase.check<OutgoingAsync>(r, prx, operation);
+ encoding_ = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding());
+ synchronous_ = false;
}
- public OutgoingAsync(Ice.ObjectPrx prx, string operation, object cookie) :
- base((Ice.ObjectPrxHelperBase)prx, operation, cookie)
- {
- _encoding = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding());
- _is = null;
- }
-
- public OutgoingAsync(Ice.ObjectPrx prx, string operation, object cookie, Ice.InputStream istr,
- Ice.OutputStream ostr) :
- base((Ice.ObjectPrxHelperBase)prx, operation, cookie, ostr)
- {
- _encoding = Protocol.getCompatibleEncoding(proxy_.reference__().getEncoding());
- _is = istr;
- }
-
- public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> ctx,
- bool explicitCtx, bool synchronous)
+ public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context,
+ bool synchronous)
{
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol()));
mode_ = mode;
- _synchronous = synchronous;
+ synchronous_ = synchronous;
- if(explicitCtx && ctx == null)
- {
- ctx = _emptyContext;
- }
- observer_ = ObserverHelper.get(proxy_, operation, ctx);
+ observer_ = ObserverHelper.get(proxy_, operation, context);
switch(proxy_.reference__().getMode())
{
@@ -477,12 +715,12 @@ namespace IceInternal
os_.writeByte((byte)mode);
- if(ctx != null)
+ if(context != null)
{
//
// Explicit context
//
- Ice.ContextHelper.write(os_, ctx);
+ Ice.ContextHelper.write(os_, context);
}
else
{
@@ -502,74 +740,23 @@ namespace IceInternal
}
}
}
-
- public override Ice.AsyncCallback sent()
+ public override bool sent()
{
- return sent(!proxy_.ice_isTwoway()); // done = true if not a two-way proxy (no response expected)
- }
-
- public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB)
- {
- cachedConnection_ = con;
- return con.sendAsyncRequest(this, compress, resp, 0, out sentCB);
- }
-
- public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB)
- {
- // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set.
- if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1)
- {
- // Disable caching by marking the streams as cached!
- state_ |= StateCachedBuffers;
- }
- return handler.invokeAsyncRequest(this, 0, _synchronous, out sentCB);
+ return base.sentImpl(!proxy_.ice_isTwoway()); // done = true if it's not a two-way proxy
}
- public override void abort(Ice.Exception ex)
+ public override bool response()
{
- Reference.Mode mode = proxy_.reference__().getMode();
- if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
- {
- proxy_.getBatchRequestQueue__().abortBatchRequest(os_);
- }
-
- base.abort(ex);
- }
-
- public void invoke()
- {
- Reference.Mode mode = proxy_.reference__().getMode();
- if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
- {
- sentSynchronously_ = true;
- proxy_.getBatchRequestQueue__().finishBatchRequest(os_, proxy_, getOperation());
- finished(true);
- return; // Don't call sent/completed callback for batch AMI requests
- }
-
- //
- // NOTE: invokeImpl doesn't throw so this can be called from the
- // try block with the catch block calling abort() in case of an
- // exception.
- //
- invokeImpl(true); // userThread = true
- }
-
- override public Ice.AsyncCallback completed()
- {
- Debug.Assert(_is != null); // _is has been initialized prior to this call
-
//
// NOTE: this method is called from ConnectionI.parseMessage
// with the connection locked. Therefore, it must not invoke
// any user callbacks.
//
-
Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways.
if(childObserver_ != null)
{
- childObserver_.reply(_is.size() - Protocol.headerSize - 4);
+ childObserver_.reply(is_.size() - Protocol.headerSize - 4);
childObserver_.detach();
childObserver_ = null;
}
@@ -577,208 +764,229 @@ namespace IceInternal
byte replyStatus;
try
{
- replyStatus = _is.readByte();
+ replyStatus = is_.readByte();
switch(replyStatus)
{
- case ReplyStatus.replyOK:
- {
- break;
- }
-
- case ReplyStatus.replyUserException:
- {
- if(observer_ != null)
+ case ReplyStatus.replyOK:
{
- observer_.userException();
+ break;
}
- break;
- }
-
- case ReplyStatus.replyObjectNotExist:
- case ReplyStatus.replyFacetNotExist:
- case ReplyStatus.replyOperationNotExist:
- {
- Ice.Identity id = new Ice.Identity();
- id.read__(_is);
-
- //
- // For compatibility with the old FacetPath.
- //
- string[] facetPath = _is.readStringSeq();
- string facet;
- if(facetPath.Length > 0)
+ case ReplyStatus.replyUserException:
{
- if(facetPath.Length > 1)
+ if(observer_ != null)
{
- throw new Ice.MarshalException();
+ observer_.userException();
}
- facet = facetPath[0];
- }
- else
- {
- facet = "";
- }
-
- string operation = _is.readString();
-
- Ice.RequestFailedException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
break;
}
+ case ReplyStatus.replyObjectNotExist:
case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
-
case ReplyStatus.replyOperationNotExist:
{
- ex = new Ice.OperationNotExistException();
- break;
- }
+ Ice.Identity ident = new Ice.Identity();
+ ident.read__(is_);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ string[] facetPath = is_.readStringSeq();
+ ;
+ string facet;
+ if(facetPath.Length > 0)
+ {
+ if(facetPath.Length > 1)
+ {
+ throw new Ice.MarshalException();
+ }
+ facet = facetPath[0];
+ }
+ else
+ {
+ facet = "";
+ }
- default:
- {
- Debug.Assert(false);
- break;
- }
- }
+ string operation = is_.readString();
- ex.id = id;
- ex.facet = facet;
- ex.operation = operation;
- throw ex;
- }
+ Ice.RequestFailedException ex = null;
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyObjectNotExist:
+ {
+ ex = new Ice.ObjectNotExistException();
+ break;
+ }
- case ReplyStatus.replyUnknownException:
- case ReplyStatus.replyUnknownLocalException:
- case ReplyStatus.replyUnknownUserException:
- {
- string unknown = _is.readString();
+ case ReplyStatus.replyFacetNotExist:
+ {
+ ex = new Ice.FacetNotExistException();
+ break;
+ }
- Ice.UnknownException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
+ case ReplyStatus.replyOperationNotExist:
+ {
+ ex = new Ice.OperationNotExistException();
+ break;
+ }
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
+ default:
+ {
+ Debug.Assert(false);
+ break;
+ }
+ }
+
+ ex.id = ident;
+ ex.facet = facet;
+ ex.operation = operation;
+ throw ex;
}
+ case ReplyStatus.replyUnknownException:
+ case ReplyStatus.replyUnknownLocalException:
case ReplyStatus.replyUnknownUserException:
{
- ex = new Ice.UnknownUserException();
- break;
+ string unknown = is_.readString();
+
+ Ice.UnknownException ex = null;
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
+ }
+
+ case ReplyStatus.replyUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
+
+ case ReplyStatus.replyUnknownUserException:
+ {
+ ex = new Ice.UnknownUserException();
+ break;
+ }
+
+ default:
+ {
+ Debug.Assert(false);
+ break;
+ }
+ }
+
+ ex.unknown = unknown;
+ throw ex;
}
default:
{
- Debug.Assert(false);
- break;
- }
+ throw new Ice.UnknownReplyStatusException();
}
-
- ex.unknown = unknown;
- throw ex;
}
- default:
- {
- throw new Ice.UnknownReplyStatusException();
- }
- }
-
- return finished(replyStatus == ReplyStatus.replyOK);
+ return responseImpl(replyStatus == ReplyStatus.replyOK);
}
catch(Ice.Exception ex)
{
- return completed(ex);
+ return exception(ex);
}
}
- public Ice.OutputStream startWriteParams(Ice.FormatType format)
- {
- os_.startEncapsulation(_encoding, format);
- return os_;
- }
-
- public void endWriteParams()
+ public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
{
- os_.endEncapsulation();
+ cachedConnection_ = connection;
+ return connection.sendAsyncRequest(this, compress, response, 0);
}
- public void writeEmptyParams()
+ public override int invokeCollocated(CollocatedRequestHandler handler)
{
- os_.writeEmptyEncapsulation(_encoding);
- }
-
- public void writeParamEncaps(byte[] encaps)
- {
- if(encaps == null || encaps.Length == 0)
- {
- os_.writeEmptyEncapsulation(_encoding);
- }
- else
+ // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set.
+ if(!proxy_.ice_isTwoway() || proxy_.reference__().getInvocationTimeout() != -1)
{
- os_.writeEncapsulation(encaps);
+ // Disable caching by marking the streams as cached!
+ state_ |= StateCachedBuffers;
}
+ return handler.invokeAsyncRequest(this, 0, synchronous_);
}
- public Ice.InputStream startReadParams()
+ public new void abort(Ice.Exception ex)
{
- _is.startEncapsulation();
- return _is;
- }
+ Reference.Mode mode = proxy_.reference__().getMode();
+ if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
+ {
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ proxy_.getBatchRequestQueue__().abortBatchRequest(os_);
+ }
- public void endReadParams()
- {
- _is.endEncapsulation();
+ base.abort(ex);
}
- public void readEmptyParams()
+ public void invoke(string operation)
{
- _is.skipEmptyEncapsulation();
- }
+ Reference.Mode mode = proxy_.reference__().getMode();
+ if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
+ {
+ sentSynchronously_ = true;
+ proxy_.getBatchRequestQueue__().finishBatchRequest(os_, proxy_, operation);
+ responseImpl(true);
+ return; // Don't call sent/completed callback for batch AMI requests
+ }
- public byte[] readParamEncaps()
- {
- Ice.EncodingVersion encoding;
- return _is.readEncapsulation(out encoding);
+ //
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
+ //
+ invokeImpl(true); // userThread = true
}
- override public Ice.InputStream getIs()
+ public void invoke(string operation,
+ Ice.OperationMode mode,
+ Ice.FormatType format,
+ Dictionary<string, string> context,
+ bool synchronous,
+ System.Action<Ice.OutputStream> write)
{
- // _is can already be initialized if the invocation is retried
- if(_is == null)
+ try
+ {
+ prepare(operation, mode, context, synchronous);
+ if(write != null)
+ {
+ os_.startEncapsulation(encoding_, format);
+ write(os_);
+ os_.endEncapsulation();
+ }
+ else
+ {
+ os_.writeEmptyEncapsulation(encoding_);
+ }
+ invoke(operation);
+ }
+ catch(Ice.Exception ex)
{
- _is = new Ice.InputStream(instance_, Ice.Util.currentProtocolEncoding);
+ abort(ex);
}
- return _is;
}
- public void throwUserException()
+ public override void throwUserException()
{
try
{
- _is.startEncapsulation();
- _is.throwException(null);
+ is_.startEncapsulation();
+ is_.throwException();
}
- catch(Ice.UserException)
+ catch(Ice.UserException ex)
{
- _is.endEncapsulation();
- throw;
+ is_.endEncapsulation();
+ userException_?.Invoke(ex);
+ throw new Ice.UnknownUserException(ex.ice_id());
}
}
@@ -795,140 +1003,274 @@ namespace IceInternal
state_ |= StateCachedBuffers;
}
- if(_is != null)
+ if(is_ != null)
{
- _is.reset();
+ is_.reset();
}
os_.reset();
- proxy_.cacheMessageBuffers(_is, os_);
+ proxy_.cacheMessageBuffers(is_, os_);
- _is = null;
+ is_ = null;
os_ = null;
}
}
- private Ice.EncodingVersion _encoding;
- private Ice.InputStream _is;
-
- //
- // If true this AMI request is being used for a generated synchronous invocation.
- //
- private bool _synchronous;
-
- private static Dictionary<string, string> _emptyContext = new Dictionary<string, string>();
+ protected readonly Ice.EncodingVersion encoding_;
+ protected System.Action<Ice.UserException> userException_;
+ protected bool synchronous_;
}
- public class CommunicatorFlushBatch : IceInternal.AsyncResultI
+ public class OutgoingAsyncT<T> : OutgoingAsync
{
- public static CommunicatorFlushBatch check(Ice.AsyncResult r, Ice.Communicator com, string operation)
+ public OutgoingAsyncT(Ice.ObjectPrxHelperBase prx,
+ OutgoingAsyncCompletionCallback completionCallback,
+ Ice.OutputStream os = null,
+ Ice.InputStream iss = null) :
+ base(prx, completionCallback, os, iss)
{
- if(r != null && r.getCommunicator() != com)
- {
- throw new System.ArgumentException("Communicator for call to end_" + operation +
- " does not match communicator that was used to call " +
- "corresponding begin_" + operation + " method");
- }
- return AsyncResultI.check<CommunicatorFlushBatch>(r, operation);
}
- public CommunicatorFlushBatch(Ice.Communicator communicator, Instance instance, string op, object cookie) :
- base(communicator, instance, op, cookie)
+ public void invoke(string operation,
+ Ice.OperationMode mode,
+ Ice.FormatType format,
+ Dictionary<string, string> context,
+ bool synchronous,
+ System.Action<Ice.OutputStream> write = null,
+ System.Action<Ice.UserException> userException = null,
+ System.Func<Ice.InputStream, T> read = null)
{
+ read_ = read;
+ userException_ = userException;
+ base.invoke(operation, mode, format, context, synchronous, write);
+ }
- observer_ = ObserverHelper.get(instance, op);
+ public T result__(bool ok)
+ {
+ try
+ {
+ if(ok)
+ {
+ if(read_ == null)
+ {
+ if(is_ == null || is_.isEmpty())
+ {
+ //
+ // If there's no response (oneway, batch-oneway proxies), we just set the promise
+ // on completion without reading anything from the input stream. This is required for
+ // batch invocations.
+ //
+ }
+ else
+ {
+ is_.skipEmptyEncapsulation();
+ }
+ return default(T);
+ }
+ else
+ {
+ is_.startEncapsulation();
+ T r = read_(is_);
+ is_.endEncapsulation();
+ return r;
+ }
+ }
+ else
+ {
+ throwUserException();
+ return default(T); // make compiler happy
+ }
+ }
+ finally
+ {
+ cacheMessageBuffers();
+ }
+ }
- //
- // _useCount is initialized to 1 to prevent premature callbacks.
- // The caller must invoke ready() after all flush requests have
- // been initiated.
- //
- _useCount = 1;
+ protected System.Func<Ice.InputStream, T> read_;
+ }
+
+ //
+ // Class for handling the proxy's begin_ice_flushBatchRequest request.
+ //
+ class ProxyFlushBatchAsync : ProxyOutgoingAsyncBase
+ {
+ public ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) :
+ base(prx, completionCallback)
+ {
}
- public void flushConnection(Ice.ConnectionI con)
+ public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
{
- lock(this)
+ if(_batchRequestNum == 0)
{
- ++_useCount;
+ if(sent())
+ {
+ return AsyncStatusSent | AsyncStatusInvokeSentCallback;
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
}
+ cachedConnection_ = connection;
+ return connection.sendAsyncRequest(this, compress, false, _batchRequestNum);
+ }
- try
+ public override int invokeCollocated(CollocatedRequestHandler handler)
+ {
+ if(_batchRequestNum == 0)
{
- Ice.AsyncCallback sentCB = null;
- FlushBatch flush = new FlushBatch(this);
- int batchRequestNum = con.getBatchRequestQueue().swap(flush.getOs());
- if(batchRequestNum == 0)
+ if(sent())
{
- flush.sent();
+ return AsyncStatusSent | AsyncStatusInvokeSentCallback;
}
else
{
- con.sendAsyncRequest(flush, false, false, batchRequestNum, out sentCB);
+ return AsyncStatusSent;
}
- Debug.Assert(sentCB == null);
}
- catch(Ice.LocalException)
+ return handler.invokeAsyncRequest(this, _batchRequestNum, false);
+ }
+
+ public void invoke(string operation)
+ {
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol()));
+ observer_ = ObserverHelper.get(proxy_, operation, null);
+ _batchRequestNum = proxy_.getBatchRequestQueue__().swap(os_);
+ invokeImpl(true); // userThread = true
+ }
+
+ private int _batchRequestNum;
+ }
+
+ //
+ // Class for handling the proxy's begin_ice_getConnection request.
+ //
+ class ProxyGetConnection : ProxyOutgoingAsyncBase
+ {
+ public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback) :
+ base(prx, completionCallback)
+ {
+ }
+
+ public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
+ {
+ cachedConnection_ = connection;
+ if(responseImpl(true))
{
- doCheck(false);
- throw;
+ invokeResponseAsync();
}
+ return AsyncStatusSent;
}
- public void ready()
+ public override int invokeCollocated(CollocatedRequestHandler handler)
{
- doCheck(true);
+ if(responseImpl(true))
+ {
+ invokeResponseAsync();
+ }
+ return AsyncStatusSent;
}
- private void doCheck(bool userThread)
+ public void invoke(string operation)
{
- lock(this)
+ observer_ = ObserverHelper.get(proxy_, operation, null);
+ invokeImpl(true); // userThread = true
+ }
+ }
+
+ class ConnectionFlushBatchAsync : OutgoingAsyncBase
+ {
+ public ConnectionFlushBatchAsync(Ice.ConnectionI connection,
+ Instance instance,
+ OutgoingAsyncCompletionCallback completionCallback) :
+ base(instance, completionCallback)
+ {
+ _connection = connection;
+ }
+
+ public void invoke(string operation)
+ {
+ observer_ = ObserverHelper.get(instance_, operation);
+ try
{
- Debug.Assert(_useCount > 0);
- if(--_useCount > 0)
+ int status;
+ int batchRequestNum = _connection.getBatchRequestQueue().swap(os_);
+ if(batchRequestNum == 0)
{
- return;
+ status = AsyncStatusSent;
+ if(sent())
+ {
+ status = status | AsyncStatusInvokeSentCallback;
+ }
+ }
+ else
+ {
+ status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
}
- }
- Ice.AsyncCallback sentCB = sent(true);
- if(userThread)
+ if((status & AsyncStatusSent) != 0)
+ {
+ sentSynchronously_ = true;
+ if((status & AsyncStatusInvokeSentCallback) != 0)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(RetryException ex)
{
- sentSynchronously_ = true;
- if(sentCB != null)
+ try
+ {
+ throw ex.get();
+ }
+ catch(Ice.LocalException ee)
{
- invokeSent(sentCB);
+ if(exception(ee))
+ {
+ invokeExceptionAsync();
+ }
}
}
- else
+ catch(Ice.Exception ex)
{
- if(sentCB != null)
+ if(exception(ex))
{
- invokeSentAsync(sentCB);
+ invokeExceptionAsync();
}
}
}
+ private readonly Ice.ConnectionI _connection;
+ };
+
+ public class CommunicatorFlushBatchAsync : OutgoingAsyncBase
+ {
class FlushBatch : OutgoingAsyncBase
{
- public FlushBatch(CommunicatorFlushBatch outAsync) :
- base(outAsync.getCommunicator(), outAsync.instance_, outAsync.getOperation(), null)
+ public FlushBatch(CommunicatorFlushBatchAsync outAsync,
+ Instance instance,
+ Ice.Instrumentation.InvocationObserver observer) : base(instance, null)
{
_outAsync = outAsync;
+ _observer = observer;
}
- public override Ice.AsyncCallback sent()
+ public override bool
+ sent()
{
if(childObserver_ != null)
{
childObserver_.detach();
childObserver_ = null;
}
- _outAsync.doCheck(false);
- return null;
+ _outAsync.check(false);
+ return false;
}
- public override Ice.AsyncCallback completed(Ice.Exception ex)
+ public override bool
+ exception(Ice.Exception ex)
{
if(childObserver_ != null)
{
@@ -936,233 +1278,289 @@ namespace IceInternal
childObserver_.detach();
childObserver_ = null;
}
- _outAsync.doCheck(false);
- return null;
+ _outAsync.check(false);
+ return false;
}
- protected override Ice.Instrumentation.InvocationObserver getObserver()
+ protected override Ice.Instrumentation.InvocationObserver
+ getObserver()
{
- return _outAsync.getObserver();
+ return _observer;
}
- private CommunicatorFlushBatch _outAsync;
- }
- private int _useCount;
- }
-
+ private CommunicatorFlushBatchAsync _outAsync;
+ private Ice.Instrumentation.InvocationObserver _observer;
+ };
- public class ConnectionFlushBatch : OutgoingAsyncBase
- {
- public static ConnectionFlushBatch check(Ice.AsyncResult r, Ice.Connection con, string operation)
+ public CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback) :
+ base(instance, callback)
{
- if(r != null && r.getConnection() != con)
- {
- throw new System.ArgumentException("Connection for call to end_" + operation +
- " does not match connection that was used to call " +
- "corresponding begin_" + operation + " method");
- }
- return AsyncResultI.check<ConnectionFlushBatch>(r, operation);
- }
-
- public ConnectionFlushBatch(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, string op,
- object cookie) :
- base(communicator, instance, op, cookie)
- {
- _connection = con;
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
}
- public override Ice.Connection getConnection()
+ public void flushConnection(Ice.ConnectionI con)
{
- return _connection;
- }
+ lock(this)
+ {
+ ++_useCount;
+ }
- public void invoke()
- {
try
{
- int batchRequestNum = _connection.getBatchRequestQueue().swap(os_);
-
- bool isSent = false;
- Ice.AsyncCallback sentCB;
+ var flushBatch = new FlushBatch(this, instance_, _observer);
+ int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
if(batchRequestNum == 0)
{
- isSent = true;
- sentCB = sent();
+ flushBatch.sent();
}
else
{
- isSent = _connection.sendAsyncRequest(this, false, false, batchRequestNum, out sentCB);
+ con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
}
+ }
+ catch(Ice.LocalException)
+ {
+ check(false);
+ throw;
+ }
+ }
- if(isSent)
- {
- sentSynchronously_ = true;
- if(sentCB != null)
- {
- invokeSent(sentCB);
- }
- }
+ public void invoke(string operation)
+ {
+ _observer = ObserverHelper.get(instance_, operation);
+ if(_observer != null)
+ {
+ _observer.attach();
}
- catch(RetryException ex)
+ instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this);
+ instance_.objectAdapterFactory().flushAsyncBatchRequests(this);
+ check(true);
+ }
+
+ public void check(bool userThread)
+ {
+ lock(this)
{
- Ice.AsyncCallback cb = completed(ex.get());
- if(cb != null)
+ Debug.Assert(_useCount > 0);
+ if(--_useCount > 0)
{
- invokeCompletedAsync(cb);
+ return;
}
}
- catch(Ice.Exception ex)
+
+ if(sentImpl(true))
{
- Ice.AsyncCallback cb = completed(ex);
- if(cb != null)
+ if(userThread)
{
- invokeCompletedAsync(cb);
+ sentSynchronously_ = true;
+ invokeSent();
+ }
+ else
+ {
+ invokeSentAsync();
}
}
}
- private Ice.ConnectionI _connection;
- }
+ private int _useCount;
+ private Ice.Instrumentation.InvocationObserver _observer;
+ };
- public class ProxyFlushBatch : ProxyOutgoingAsyncBase
+ public abstract class TaskCompletionCallback<T> : TaskCompletionSource<T>, OutgoingAsyncCompletionCallback
{
- public new static ProxyFlushBatch check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
+ public TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)
{
- return ProxyOutgoingAsyncBase.check<ProxyFlushBatch>(r, prx, operation);
+ _progress = progress;
+ _cancellationToken = cancellationToken;
}
- public ProxyFlushBatch(Ice.ObjectPrxHelperBase prx, string operation, object cookie) :
- base(prx, operation, cookie)
+ public void init(OutgoingAsyncBase outgoing)
{
- observer_ = ObserverHelper.get(prx, operation);
- _batchRequestNum = prx.getBatchRequestQueue__().swap(os_);
+ if(_cancellationToken.CanBeCanceled)
+ {
+ _cancellationToken.Register(outgoing.cancel);
+ }
}
- public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB)
+ public virtual bool handleSent(bool done, bool alreadySent)
{
- if(_batchRequestNum == 0)
+ if(done)
{
- sentCB = sent();
- return true;
+ SetResult(default(T));
}
- cachedConnection_ = con;
- return con.sendAsyncRequest(this, compress, false, _batchRequestNum, out sentCB);
+ return _progress != null && !alreadySent; // Invoke the sent callback only if not already invoked.
}
- public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB)
+ public bool handleException(Ice.Exception ex)
{
- if(_batchRequestNum == 0)
- {
- sentCB = sent();
- return true;
- }
- return handler.invokeAsyncRequest(this, _batchRequestNum, false, out sentCB);
+ SetException(ex);
+ return false;
}
- public void invoke()
+ public abstract bool handleResponse(bool ok, OutgoingAsyncBase og);
+
+ public void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og)
{
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.reference__().getProtocol()));
- invokeImpl(true); // userThread = true
+ _progress.Report(sentSynchronously);
}
- private int _batchRequestNum;
+ public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)
+ {
+ Debug.Assert(false);
+ }
+
+ public void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
+ {
+ Debug.Assert(false);
+ }
+
+ private readonly CancellationToken _cancellationToken;
+ private readonly System.IProgress<bool> _progress;
}
- public class ProxyGetConnection : ProxyOutgoingAsyncBase, Ice.AsyncResult<Ice.Callback_Object_ice_getConnection>
+ public class OperationTaskCompletionCallback<T> : TaskCompletionCallback<T>
{
- public new static ProxyGetConnection check(Ice.AsyncResult r, Ice.ObjectPrx prx, string operation)
+ public OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) :
+ base(progress, cancellationToken)
{
- return ProxyOutgoingAsyncBase.check<ProxyGetConnection>(r, prx, operation);
}
- public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, string operation,
- ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb, object cookie) :
- base(prx, operation, cookie)
+ public override bool handleResponse(bool ok, OutgoingAsyncBase og)
{
- observer_ = ObserverHelper.get(prx, operation);
- _completed = cb;
+ SetResult(((OutgoingAsyncT<T>)og).result__(ok));
+ return false;
}
+ }
- public override bool invokeRemote(Ice.ConnectionI con, bool compress, bool resp, out Ice.AsyncCallback sentCB)
+ public class FlushBatchTaskCompletionCallback : TaskCompletionCallback<object>
+ {
+ public FlushBatchTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken) :
+ base(progress, cancellationToken)
{
- sentCB = null;
- cachedConnection_ = con;
- Ice.AsyncCallback cb = finished(true);
- if(cb != null)
- {
- invokeCompletedAsync(cb);
- }
- return true;
}
- public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCB)
+ public override bool handleResponse(bool ok, OutgoingAsyncBase og)
{
- sentCB = null;
- Ice.AsyncCallback cb = finished(true);
- if(cb != null)
- {
- invokeCompletedAsync(cb);
- }
- return true;
+ SetResult(null);
+ return false;
}
+ }
- public void invoke()
+ abstract public class AsyncResultCompletionCallback : AsyncResultI, OutgoingAsyncCompletionCallback
+ {
+ public AsyncResultCompletionCallback(Ice.Communicator com, Instance instance, string op, object cookie,
+ Ice.AsyncCallback cb) :
+ base(com, instance, op, cookie, cb)
{
- invokeImpl(true); // userThread = true
}
- new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> whenCompleted(Ice.ExceptionCallback excb)
+ public void init(OutgoingAsyncBase outgoing)
{
- base.whenCompleted(excb);
- return this;
+ outgoing_ = outgoing;
}
- virtual public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection>
- whenCompleted(Ice.Callback_Object_ice_getConnection cb, Ice.ExceptionCallback excb)
+ public bool handleSent(bool done, bool alreadySent)
{
- if(cb == null && excb == null)
+ lock(this)
{
- throw new System.ArgumentException("callback is null");
+ state_ |= StateSent;
+ if(done)
+ {
+ state_ |= StateDone | StateOK;
+ }
+ if(waitHandle_ != null)
+ {
+ waitHandle_.Set();
+ }
+ Monitor.PulseAll(this);
+
+ //
+ // Invoke the sent callback only if not already invoked.
+ //
+ return !alreadySent && sentCallback_ != null;
}
+ }
+
+ public bool handleException(Ice.Exception ex)
+ {
lock(this)
{
- if(_responseCallback != null || exceptionCallback_ != null)
+ state_ |= StateDone;
+ exception_ = ex;
+ if(waitHandle_ != null)
{
- throw new System.ArgumentException("callback already set");
+ waitHandle_.Set();
}
- _responseCallback = cb;
- exceptionCallback_ = excb;
+ Monitor.PulseAll(this);
+ return completedCallback_ != null;
}
- setCompletedCallback(getCompletedCallback());
- return this;
}
- new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> whenSent(Ice.SentCallback cb)
+ public bool handleResponse(bool ok, OutgoingAsyncBase og)
{
- base.whenSent(cb);
- return this;
+ lock(this)
+ {
+ state_ |= StateDone;
+ if(ok)
+ {
+ state_ |= StateOK;
+ }
+ if(waitHandle_ != null)
+ {
+ waitHandle_.Set();
+ }
+ Monitor.PulseAll(this);
+ return completedCallback_ != null;
+ }
}
- protected override Ice.AsyncCallback getCompletedCallback()
+ public void handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase og)
{
- return (Ice.AsyncResult result) => { _completed(this, _responseCallback, exceptionCallback_); };
+ sentCallback_(this);
}
- private ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> _completed;
- private Ice.Callback_Object_ice_getConnection _responseCallback = null;
+ public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og)
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(Ice.Exception e)
+ {
+ throw new System.AggregateException(e);
+ }
+ }
+
+ public void handleInvokeResponse(bool ok, OutgoingAsyncBase og)
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(Ice.Exception e)
+ {
+ throw new System.AggregateException(e);
+ }
+ }
}
- public abstract class OutgoingAsync<T> : OutgoingAsync, Ice.AsyncResult<T>
+ abstract public class ProxyAsyncResultCompletionCallback<T> : AsyncResultCompletionCallback, Ice.AsyncResult<T>
{
- public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie) :
- base(prx, operation, cookie)
+ public ProxyAsyncResultCompletionCallback(Ice.ObjectPrxHelperBase proxy, string operation, object cookie,
+ Ice.AsyncCallback cb) :
+ base(proxy.ice_getCommunicator(), proxy.reference__().getInstance(), operation, cookie, cb)
{
+ _proxy = proxy;
}
- public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, Ice.InputStream iss,
- Ice.OutputStream os) :
- base(prx, operation, cookie, iss, os)
+ public override Ice.ObjectPrx getProxy()
{
+ return _proxy;
}
new public Ice.AsyncResult<T> whenCompleted(Ice.ExceptionCallback excb)
@@ -1197,73 +1595,45 @@ namespace IceInternal
}
protected T responseCallback_;
+ private Ice.ObjectPrx _proxy;
}
- public class TwowayOutgoingAsync<T> : OutgoingAsync<T>
- {
- public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
- object cookie) :
- base(prx, operation, cookie)
- {
- Debug.Assert(cb != null);
- _completed = cb;
- }
-
- public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
- object cookie, Ice.InputStream iss, Ice.OutputStream os) :
- base(prx, operation, cookie, iss, os)
- {
- Debug.Assert(cb != null);
- _completed = cb;
- }
-
- override protected Ice.AsyncCallback getCompletedCallback()
- {
- return (Ice.AsyncResult result) => { _completed(this, responseCallback_, exceptionCallback_); };
- }
-
- private ProxyTwowayCallback<T> _completed;
- }
-
- public class OnewayOutgoingAsync<T> : OutgoingAsync<T>
+ public class OperationAsyncResultCompletionCallback<T, R> : ProxyAsyncResultCompletionCallback<T>
{
- public OnewayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyOnewayCallback<T> cb,
- object cookie) :
- base(prx, operation, cookie)
+ public OperationAsyncResultCompletionCallback(System.Action<T, R> completed,
+ Ice.ObjectPrxHelperBase proxy,
+ string operation,
+ object cookie,
+ Ice.AsyncCallback callback) :
+ base(proxy, operation, cookie, callback)
{
- Debug.Assert(cb != null);
- _completed = cb;
- }
-
- public OnewayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyOnewayCallback<T> cb,
- object cookie, Ice.InputStream iss, Ice.OutputStream os) :
- base(prx, operation, cookie, iss, os)
- {
- Debug.Assert(cb != null);
- _completed = cb;
+ _completed = completed;
}
override protected Ice.AsyncCallback getCompletedCallback()
{
- return (Ice.AsyncResult result) =>
+ return (Ice.AsyncResult r) =>
{
+ Debug.Assert(r == this);
try
{
- IceInternal.OutgoingAsync outAsync__ = (IceInternal.OutgoingAsync)result;
- ((Ice.ObjectPrxHelperBase)(outAsync__.getProxy())).end__(outAsync__, outAsync__.getOperation());
- }
- catch(Ice.Exception ex__)
- {
- if(exceptionCallback_ != null)
+ R result = ((OutgoingAsyncT<R>)outgoing_).result__(wait());
+ try
{
- exceptionCallback_(ex__);
+ _completed(responseCallback_, result);
}
- return;
+ catch(Ice.Exception ex)
+ {
+ throw new System.AggregateException(ex);
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ exceptionCallback_?.Invoke(ex);
}
- _completed(responseCallback_);
};
}
- private ProxyOnewayCallback<T> _completed;
+ private System.Action<T, R> _completed;
}
}