summaryrefslogtreecommitdiff
path: root/cs/src/Ice/OutgoingAsync.cs
diff options
context:
space:
mode:
Diffstat (limited to 'cs/src/Ice/OutgoingAsync.cs')
-rw-r--r--cs/src/Ice/OutgoingAsync.cs189
1 files changed, 143 insertions, 46 deletions
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs
index 5a7e9a11f8b..0f84936de22 100644
--- a/cs/src/Ice/OutgoingAsync.cs
+++ b/cs/src/Ice/OutgoingAsync.cs
@@ -104,7 +104,7 @@ namespace IceInternal
bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCallback);
bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback);
-
+
//
// Called by the connection when the message is confirmed sent. The connection is locked
// when this is called so this method can call the sent callback. Instead, this method
@@ -500,8 +500,8 @@ namespace IceInternal
{
if(observer_ != null)
{
- childObserver_ = observer_.getCollocatedObserver(adapter,
- requestId,
+ childObserver_ = observer_.getCollocatedObserver(adapter,
+ requestId,
os_.size() - IceInternal.Protocol.headerSize - 4);
if(childObserver_ != null)
{
@@ -734,10 +734,10 @@ namespace IceInternal
lock(monitor_)
{
- handler = timeoutRequestHandler_;
+ handler = timeoutRequestHandler_;
timeoutRequestHandler_ = null;
}
-
+
if(handler != null)
{
handler.asyncRequestCanceled((OutgoingAsyncMessageCallback)this, new Ice.InvocationTimeoutException());
@@ -807,9 +807,9 @@ namespace IceInternal
public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context,
bool explicitContext, bool synchronous)
{
- _handler = null;
+ handler_ = null;
_sent = false;
- _cnt = 0;
+ cnt_ = 0;
_mode = mode;
sentSynchronously_ = false;
synchronous_ = synchronous;
@@ -840,14 +840,14 @@ namespace IceInternal
{
try
{
- _handler = proxy_.getRequestHandler__();
- _handler.prepareBatchRequest(os_);
+ handler_ = proxy_.getRequestHandler__();
+ handler_.prepareBatchRequest(os_);
break;
}
catch(RetryException)
{
// Clear request handler and retry.
- proxy_.setRequestHandler__(_handler, null);
+ proxy_.setRequestHandler__(handler_, null);
}
catch(Ice.LocalException ex)
{
@@ -856,8 +856,8 @@ namespace IceInternal
observer_.failed(ex.ice_name());
}
// Clear request handler
- proxy_.setRequestHandler__(_handler, null);
- _handler = null;
+ proxy_.setRequestHandler__(handler_, null);
+ handler_ = null;
throw ex;
}
}
@@ -918,14 +918,14 @@ namespace IceInternal
return proxy_;
}
- public bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB)
+ public virtual bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB)
{
// Store away the connection for passing to the dispatcher.
cachedConnection_ = connection;
return connection.sendAsyncRequest(this, compress, response, out sentCB);
}
- public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
+ public virtual bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
{
// The BasicStream cannot be cached if the proxy is
// not a twoway or there is an invocation timeout set.
@@ -937,15 +937,15 @@ namespace IceInternal
handler.invokeAsyncRequest(this, synchronous_, out sentCallback);
return false;
}
-
- public Ice.AsyncCallback sent()
+
+ public virtual Ice.AsyncCallback sent()
{
lock(monitor_)
{
bool alreadySent = (state_ & StateSent) != 0;
state_ |= StateSent;
_sent = true;
-
+
Debug.Assert((state_ & StateDone) == 0);
if(!proxy_.ice_isTwoway())
{
@@ -977,17 +977,17 @@ namespace IceInternal
}
}
- public new void invokeSent(Ice.AsyncCallback cb)
+ public virtual new void invokeSent(Ice.AsyncCallback cb)
{
base.invokeSent(cb);
}
- public new void invokeCompleted(Ice.AsyncCallback cb)
+ public virtual new void invokeCompleted(Ice.AsyncCallback cb)
{
base.invokeCompleted(cb);
}
- public void finished(Ice.Exception exc)
+ public virtual void finished(Ice.Exception exc)
{
lock(monitor_)
{
@@ -1019,11 +1019,11 @@ namespace IceInternal
}
}
- public void
+ public void
dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
OutgoingAsync self = this;
- threadPool.dispatch(() =>
+ threadPool.dispatch(() =>
{
self.finished(ex);
}, connection);
@@ -1053,7 +1053,7 @@ namespace IceInternal
instance_.timer().cancel(this);
timeoutRequestHandler_ = null;
}
-
+
replyStatus = is_.readByte();
switch(replyStatus)
@@ -1221,7 +1221,7 @@ namespace IceInternal
waitHandle_.Set();
}
System.Threading.Monitor.PulseAll(monitor_);
-
+
if(completedCallback_ == null)
{
if(observer_ != null)
@@ -1243,7 +1243,7 @@ namespace IceInternal
if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
{
state_ |= StateDone | StateOK;
- _handler.finishBatchRequest(os_);
+ handler_.finishBatchRequest(os_);
if(observer_ != null)
{
observer_.detach();
@@ -1257,9 +1257,9 @@ namespace IceInternal
try
{
_sent = false;
- _handler = proxy_.getRequestHandler__();
+ handler_ = proxy_.getRequestHandler__();
Ice.AsyncCallback sentCallback;
- bool sent = _handler.sendAsyncRequest(this, out sentCallback);
+ bool sent = handler_.sendAsyncRequest(this, out sentCallback);
if(sent)
{
if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread.
@@ -1279,11 +1279,11 @@ namespace IceInternal
{
if((state_ & StateDone) == 0)
{
- int invocationTimeout = _handler.getReference().getInvocationTimeout();
+ int invocationTimeout = handler_.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
instance_.timer().schedule(this, invocationTimeout);
- timeoutRequestHandler_ = _handler;
+ timeoutRequestHandler_ = handler_;
}
}
}
@@ -1292,7 +1292,7 @@ namespace IceInternal
catch(RetryException)
{
- proxy_.setRequestHandler__(_handler, null); // Clear request handler and retry.
+ proxy_.setRequestHandler__(handler_, null); // Clear request handler and retry.
continue;
}
catch(Ice.Exception ex)
@@ -1365,7 +1365,7 @@ namespace IceInternal
}
}
- public void
+ public void
runTimerTask()
{
runTimerTask__();
@@ -1389,14 +1389,14 @@ namespace IceInternal
is_.reset();
}
os_.reset();
-
+
proxy_.cacheMessageBuffers(is_, os_);
}
}
override public void invokeExceptionAsync(Ice.Exception ex)
{
- if((state_ & StateDone) == 0 && _handler != null)
+ if((state_ & StateDone) == 0 && handler_ != null)
{
//
// If we didn't finish a batch oneway or datagram request, we
@@ -1406,7 +1406,7 @@ namespace IceInternal
Reference.Mode mode = proxy_.reference__().getMode();
if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram)
{
- _handler.abortBatchRequest();
+ handler_.abortBatchRequest();
}
}
base.invokeExceptionAsync(ex);
@@ -1416,7 +1416,7 @@ namespace IceInternal
{
try
{
- int interval = proxy_.handleException__(exc, _handler, _mode, _sent, ref _cnt);
+ int interval = proxy_.handleException__(exc, handler_, _mode, _sent, ref cnt_);
if(observer_ != null)
{
observer_.retried(); // Invocation is being retried.
@@ -1458,10 +1458,10 @@ namespace IceInternal
}
protected Ice.ObjectPrxHelperBase proxy_;
+ protected RequestHandler handler_;
+ protected int cnt_;
- private RequestHandler _handler;
private Ice.EncodingVersion _encoding;
- private int _cnt;
private Ice.OperationMode _mode;
private bool _sent;
@@ -1475,7 +1475,7 @@ namespace IceInternal
{
}
- public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss,
+ public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss,
BasicStream os) :
base(prx, operation, cookie, iss, os)
{
@@ -1562,7 +1562,7 @@ namespace IceInternal
public class TwowayOutgoingAsync<T> : OutgoingAsync<T>
{
- public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
+ public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
object cookie) :
base(prx, operation, cookie)
{
@@ -1570,7 +1570,7 @@ namespace IceInternal
_completed = cb;
}
- public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
+ public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb,
object cookie, BasicStream iss, BasicStream os) :
base(prx, operation, cookie, iss, os)
{
@@ -1634,6 +1634,103 @@ namespace IceInternal
private ProxyOnewayCallback<T> _completed;
}
+ public class GetConnectionOutgoingAsync : TwowayOutgoingAsync<Ice.Callback_Object_ice_getConnection>
+ {
+ public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase proxy, string operation,
+ ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb,
+ object cookie) :
+ base(proxy, operation, cb, cookie)
+ {
+ observer_ = ObserverHelper.get(proxy, operation);
+ }
+
+ public void invoke()
+ {
+ while(true)
+ {
+ try
+ {
+ handler_ = proxy_.getRequestHandler__();
+ Ice.AsyncCallback sentCallback;
+ handler_.sendAsyncRequest(this, out sentCallback);
+ }
+ catch(RetryException)
+ {
+ proxy_.setRequestHandler__(handler_, null);
+ }
+ catch(Ice.Exception ex)
+ {
+ handleException(ex);
+ }
+ break;
+ }
+ }
+
+ public override bool send(Ice.ConnectionI connection, bool compress, bool response,
+ out Ice.AsyncCallback sentCallback)
+ {
+ sent();
+ sentCallback = null;
+ return false;
+ }
+
+ public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
+ {
+ sent();
+ sentCallback = null;
+ return false;
+ }
+
+ public override Ice.AsyncCallback sent()
+ {
+ lock(monitor_)
+ {
+ state_ |= StateDone;
+ System.Threading.Monitor.PulseAll(monitor_);
+ }
+ invokeCompleted(completedCallback_);
+ return null;
+ }
+
+ public override void invokeSent(Ice.AsyncCallback cb)
+ {
+ // No sent callback
+ }
+
+ public override void finished(Ice.Exception exc)
+ {
+ try
+ {
+ handleException(exc);
+ }
+ catch(Ice.Exception ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+
+ private void handleException(Ice.Exception exc)
+ {
+ try
+ {
+ instance_.retryQueue().add(this, proxy_.handleException__(exc, handler_, Ice.OperationMode.Idempotent,
+ false, ref cnt_));
+ if(observer_ != null)
+ {
+ observer_.retried(); // Invocation is being retried
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ if(observer_ != null)
+ {
+ observer_.failed(ex.ice_name());
+ }
+ throw ex;
+ }
+ }
+ }
+
public class BatchOutgoingAsync : OutgoingAsyncBase, OutgoingAsyncMessageCallback, TimerTask
{
public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, string operation, object cookie) :
@@ -1647,12 +1744,12 @@ namespace IceInternal
cachedConnection_ = connection;
return connection.flushAsyncBatchRequests(this, out sentCallback);
}
-
+
public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
{
return handler.invokeAsyncBatchRequests(this, out sentCallback);
}
-
+
virtual public Ice.AsyncCallback sent()
{
lock(monitor_)
@@ -1712,17 +1809,17 @@ namespace IceInternal
invokeException(exc);
}
- public void
+ public void
dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
BatchOutgoingAsync self = this;
- threadPool.dispatch(() =>
+ threadPool.dispatch(() =>
{
self.finished(ex);
}, connection);
}
- public void
+ public void
runTimerTask()
{
runTimerTask__();
@@ -1954,7 +2051,7 @@ namespace IceInternal
_outAsync.check(false);
}
- override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt,
+ override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt,
int requestId, int sz)
{
if(_outAsync.observer_ != null)