diff options
Diffstat (limited to 'cs/src/Ice/OutgoingAsync.cs')
-rw-r--r-- | cs/src/Ice/OutgoingAsync.cs | 189 |
1 files changed, 143 insertions, 46 deletions
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 5a7e9a11f8b..0f84936de22 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -104,7 +104,7 @@ namespace IceInternal bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCallback); bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback); - + // // Called by the connection when the message is confirmed sent. The connection is locked // when this is called so this method can call the sent callback. Instead, this method @@ -500,8 +500,8 @@ namespace IceInternal { if(observer_ != null) { - childObserver_ = observer_.getCollocatedObserver(adapter, - requestId, + childObserver_ = observer_.getCollocatedObserver(adapter, + requestId, os_.size() - IceInternal.Protocol.headerSize - 4); if(childObserver_ != null) { @@ -734,10 +734,10 @@ namespace IceInternal lock(monitor_) { - handler = timeoutRequestHandler_; + handler = timeoutRequestHandler_; timeoutRequestHandler_ = null; } - + if(handler != null) { handler.asyncRequestCanceled((OutgoingAsyncMessageCallback)this, new Ice.InvocationTimeoutException()); @@ -807,9 +807,9 @@ namespace IceInternal public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context, bool explicitContext, bool synchronous) { - _handler = null; + handler_ = null; _sent = false; - _cnt = 0; + cnt_ = 0; _mode = mode; sentSynchronously_ = false; synchronous_ = synchronous; @@ -840,14 +840,14 @@ namespace IceInternal { try { - _handler = proxy_.getRequestHandler__(); - _handler.prepareBatchRequest(os_); + handler_ = proxy_.getRequestHandler__(); + handler_.prepareBatchRequest(os_); break; } catch(RetryException) { // Clear request handler and retry. - proxy_.setRequestHandler__(_handler, null); + proxy_.setRequestHandler__(handler_, null); } catch(Ice.LocalException ex) { @@ -856,8 +856,8 @@ namespace IceInternal observer_.failed(ex.ice_name()); } // Clear request handler - proxy_.setRequestHandler__(_handler, null); - _handler = null; + proxy_.setRequestHandler__(handler_, null); + handler_ = null; throw ex; } } @@ -918,14 +918,14 @@ namespace IceInternal return proxy_; } - public bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB) + public virtual bool send(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB) { // Store away the connection for passing to the dispatcher. cachedConnection_ = connection; return connection.sendAsyncRequest(this, compress, response, out sentCB); } - public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) + public virtual bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) { // The BasicStream cannot be cached if the proxy is // not a twoway or there is an invocation timeout set. @@ -937,15 +937,15 @@ namespace IceInternal handler.invokeAsyncRequest(this, synchronous_, out sentCallback); return false; } - - public Ice.AsyncCallback sent() + + public virtual Ice.AsyncCallback sent() { lock(monitor_) { bool alreadySent = (state_ & StateSent) != 0; state_ |= StateSent; _sent = true; - + Debug.Assert((state_ & StateDone) == 0); if(!proxy_.ice_isTwoway()) { @@ -977,17 +977,17 @@ namespace IceInternal } } - public new void invokeSent(Ice.AsyncCallback cb) + public virtual new void invokeSent(Ice.AsyncCallback cb) { base.invokeSent(cb); } - public new void invokeCompleted(Ice.AsyncCallback cb) + public virtual new void invokeCompleted(Ice.AsyncCallback cb) { base.invokeCompleted(cb); } - public void finished(Ice.Exception exc) + public virtual void finished(Ice.Exception exc) { lock(monitor_) { @@ -1019,11 +1019,11 @@ namespace IceInternal } } - public void + public void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { OutgoingAsync self = this; - threadPool.dispatch(() => + threadPool.dispatch(() => { self.finished(ex); }, connection); @@ -1053,7 +1053,7 @@ namespace IceInternal instance_.timer().cancel(this); timeoutRequestHandler_ = null; } - + replyStatus = is_.readByte(); switch(replyStatus) @@ -1221,7 +1221,7 @@ namespace IceInternal waitHandle_.Set(); } System.Threading.Monitor.PulseAll(monitor_); - + if(completedCallback_ == null) { if(observer_ != null) @@ -1243,7 +1243,7 @@ namespace IceInternal if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) { state_ |= StateDone | StateOK; - _handler.finishBatchRequest(os_); + handler_.finishBatchRequest(os_); if(observer_ != null) { observer_.detach(); @@ -1257,9 +1257,9 @@ namespace IceInternal try { _sent = false; - _handler = proxy_.getRequestHandler__(); + handler_ = proxy_.getRequestHandler__(); Ice.AsyncCallback sentCallback; - bool sent = _handler.sendAsyncRequest(this, out sentCallback); + bool sent = handler_.sendAsyncRequest(this, out sentCallback); if(sent) { if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread. @@ -1279,11 +1279,11 @@ namespace IceInternal { if((state_ & StateDone) == 0) { - int invocationTimeout = _handler.getReference().getInvocationTimeout(); + int invocationTimeout = handler_.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { instance_.timer().schedule(this, invocationTimeout); - timeoutRequestHandler_ = _handler; + timeoutRequestHandler_ = handler_; } } } @@ -1292,7 +1292,7 @@ namespace IceInternal catch(RetryException) { - proxy_.setRequestHandler__(_handler, null); // Clear request handler and retry. + proxy_.setRequestHandler__(handler_, null); // Clear request handler and retry. continue; } catch(Ice.Exception ex) @@ -1365,7 +1365,7 @@ namespace IceInternal } } - public void + public void runTimerTask() { runTimerTask__(); @@ -1389,14 +1389,14 @@ namespace IceInternal is_.reset(); } os_.reset(); - + proxy_.cacheMessageBuffers(is_, os_); } } override public void invokeExceptionAsync(Ice.Exception ex) { - if((state_ & StateDone) == 0 && _handler != null) + if((state_ & StateDone) == 0 && handler_ != null) { // // If we didn't finish a batch oneway or datagram request, we @@ -1406,7 +1406,7 @@ namespace IceInternal Reference.Mode mode = proxy_.reference__().getMode(); if(mode == Reference.Mode.ModeBatchOneway || mode == Reference.Mode.ModeBatchDatagram) { - _handler.abortBatchRequest(); + handler_.abortBatchRequest(); } } base.invokeExceptionAsync(ex); @@ -1416,7 +1416,7 @@ namespace IceInternal { try { - int interval = proxy_.handleException__(exc, _handler, _mode, _sent, ref _cnt); + int interval = proxy_.handleException__(exc, handler_, _mode, _sent, ref cnt_); if(observer_ != null) { observer_.retried(); // Invocation is being retried. @@ -1458,10 +1458,10 @@ namespace IceInternal } protected Ice.ObjectPrxHelperBase proxy_; + protected RequestHandler handler_; + protected int cnt_; - private RequestHandler _handler; private Ice.EncodingVersion _encoding; - private int _cnt; private Ice.OperationMode _mode; private bool _sent; @@ -1475,7 +1475,7 @@ namespace IceInternal { } - public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss, + public OutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, object cookie, BasicStream iss, BasicStream os) : base(prx, operation, cookie, iss, os) { @@ -1562,7 +1562,7 @@ namespace IceInternal public class TwowayOutgoingAsync<T> : OutgoingAsync<T> { - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, + public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, object cookie) : base(prx, operation, cookie) { @@ -1570,7 +1570,7 @@ namespace IceInternal _completed = cb; } - public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, + public TwowayOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation, ProxyTwowayCallback<T> cb, object cookie, BasicStream iss, BasicStream os) : base(prx, operation, cookie, iss, os) { @@ -1634,6 +1634,103 @@ namespace IceInternal private ProxyOnewayCallback<T> _completed; } + public class GetConnectionOutgoingAsync : TwowayOutgoingAsync<Ice.Callback_Object_ice_getConnection> + { + public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase proxy, string operation, + ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb, + object cookie) : + base(proxy, operation, cb, cookie) + { + observer_ = ObserverHelper.get(proxy, operation); + } + + public void invoke() + { + while(true) + { + try + { + handler_ = proxy_.getRequestHandler__(); + Ice.AsyncCallback sentCallback; + handler_.sendAsyncRequest(this, out sentCallback); + } + catch(RetryException) + { + proxy_.setRequestHandler__(handler_, null); + } + catch(Ice.Exception ex) + { + handleException(ex); + } + break; + } + } + + public override bool send(Ice.ConnectionI connection, bool compress, bool response, + out Ice.AsyncCallback sentCallback) + { + sent(); + sentCallback = null; + return false; + } + + public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) + { + sent(); + sentCallback = null; + return false; + } + + public override Ice.AsyncCallback sent() + { + lock(monitor_) + { + state_ |= StateDone; + System.Threading.Monitor.PulseAll(monitor_); + } + invokeCompleted(completedCallback_); + return null; + } + + public override void invokeSent(Ice.AsyncCallback cb) + { + // No sent callback + } + + public override void finished(Ice.Exception exc) + { + try + { + handleException(exc); + } + catch(Ice.Exception ex) + { + invokeExceptionAsync(ex); + } + } + + private void handleException(Ice.Exception exc) + { + try + { + instance_.retryQueue().add(this, proxy_.handleException__(exc, handler_, Ice.OperationMode.Idempotent, + false, ref cnt_)); + if(observer_ != null) + { + observer_.retried(); // Invocation is being retried + } + } + catch(Ice.Exception ex) + { + if(observer_ != null) + { + observer_.failed(ex.ice_name()); + } + throw ex; + } + } + } + public class BatchOutgoingAsync : OutgoingAsyncBase, OutgoingAsyncMessageCallback, TimerTask { public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, string operation, object cookie) : @@ -1647,12 +1744,12 @@ namespace IceInternal cachedConnection_ = connection; return connection.flushAsyncBatchRequests(this, out sentCallback); } - + public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback) { return handler.invokeAsyncBatchRequests(this, out sentCallback); } - + virtual public Ice.AsyncCallback sent() { lock(monitor_) @@ -1712,17 +1809,17 @@ namespace IceInternal invokeException(exc); } - public void + public void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { BatchOutgoingAsync self = this; - threadPool.dispatch(() => + threadPool.dispatch(() => { self.finished(ex); }, connection); } - public void + public void runTimerTask() { runTimerTask__(); @@ -1954,7 +2051,7 @@ namespace IceInternal _outAsync.check(false); } - override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, + override public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int sz) { if(_outAsync.observer_ != null) |