diff options
Diffstat (limited to 'csharp/src/Ice/ConnectionI.cs')
-rw-r--r-- | csharp/src/Ice/ConnectionI.cs | 293 |
1 files changed, 163 insertions, 130 deletions
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index ba5668eac74..252909df781 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -10,15 +10,17 @@ namespace Ice { using System; - using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading; - using Ice.Instrumentation; + using System.Threading.Tasks; + using System.Linq; - public sealed class ConnectionI : - IceInternal.EventHandler, IceInternal.ResponseHandler, IceInternal.CancellationHandler, Connection + using Instrumentation; + using IceInternal; + + public sealed class ConnectionI : IceInternal.EventHandler, ResponseHandler, CancellationHandler, Connection { public interface StartCallback { @@ -26,7 +28,7 @@ namespace Ice void connectionStartFailed(ConnectionI connection, LocalException ex); } - private class TimeoutCallback : IceInternal.TimerTask + private class TimeoutCallback : TimerTask { public TimeoutCallback(ConnectionI connection) { @@ -56,7 +58,7 @@ namespace Ice throw _exception; } - if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) + if(!initialize(SocketOperation.None) || !validate(SocketOperation.None)) { _startCallback = callback; return; @@ -93,7 +95,7 @@ namespace Ice throw _exception; } - if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) + if(!initialize(SocketOperation.None) || !validate(SocketOperation.None)) { // // Wait for the connection to be validated. @@ -135,7 +137,7 @@ namespace Ice if(_acmLastActivity > -1) { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + _acmLastActivity = Time.currentMonotonicTimeMillis(); } setState(StateActive); } @@ -315,7 +317,7 @@ namespace Ice } } - public void monitor(long now, IceInternal.ACMConfig acm) + public void monitor(long now, ACMConfig acm) { lock(this) { @@ -347,7 +349,7 @@ namespace Ice } } - if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty()) + if(_readStream.size() > Protocol.headerSize || !_writeStream.isEmpty()) { // // If writing or reading, nothing to do, the connection @@ -382,21 +384,21 @@ namespace Ice } } - public bool sendAsyncRequest(IceInternal.OutgoingAsyncBase og, bool compress, bool response, - int batchRequestNum, out Ice.AsyncCallback sentCallback) + public int sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response, + int batchRequestNum) { OutputStream os = og.getOs(); lock(this) { + // + // If the exception is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // if(_exception != null) { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw new IceInternal.RetryException(_exception); + throw new RetryException(_exception); } Debug.Assert(_state > StateNotValidated); @@ -413,7 +415,6 @@ namespace Ice // This will throw if the request is canceled. // og.cancelable(this); - int requestId = 0; if(response) { @@ -430,25 +431,24 @@ namespace Ice // // Fill in the request ID. // - os.pos(IceInternal.Protocol.headerSize); + os.pos(Protocol.headerSize); os.writeInt(requestId); } else if(batchRequestNum > 0) { - os.pos(IceInternal.Protocol.headerSize); + os.pos(Protocol.headerSize); os.writeInt(batchRequestNum); } og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); - bool sent; + int status = OutgoingAsyncBase.AsyncStatusQueued; try { - OutgoingMessage msg = new OutgoingMessage(og, os, compress, requestId); - sent = sendMessage(msg); - sentCallback = msg.sentCallback; + OutgoingMessage message = new OutgoingMessage(og, os, compress, requestId); + status = sendMessage(message); } - catch(LocalException ex) + catch(Ice.LocalException ex) { setState(StateClosed, ex); Debug.Assert(_exception != null); @@ -462,51 +462,88 @@ namespace Ice // _asyncRequests[requestId] = og; } - return sent; + return status; } } - public IceInternal.BatchRequestQueue getBatchRequestQueue() + public BatchRequestQueue getBatchRequestQueue() { return _batchRequestQueue; } public void flushBatchRequests() { - end_flushBatchRequests(begin_flushBatchRequests()); + flushBatchRequestsAsync().Wait(); } - public AsyncResult begin_flushBatchRequests() + private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback { - return begin_flushBatchRequestsInternal(null, null); + public ConnectionFlushBatchCompletionCallback(Ice.Connection connection, + Ice.Communicator communicator, + Instance instance, + string op, + object cookie, + Ice.AsyncCallback callback) + : base(communicator, instance, op, cookie, callback) + { + _connection = connection; + } + + public override Ice.Connection getConnection() + { + return _connection; + } + + protected override Ice.AsyncCallback getCompletedCallback() + { + return (Ice.AsyncResult result) => + { + try + { + result.throwLocalException(); + } + catch(Ice.Exception ex) + { + exceptionCallback_?.Invoke(ex); + } + }; + } + + private Ice.Connection _connection; } - public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie) + public Task flushBatchRequestsAsync(IProgress<bool> progress = null, + CancellationToken cancel = new CancellationToken()) { - return begin_flushBatchRequestsInternal(cb, cookie); + var completed = new FlushBatchTaskCompletionCallback(progress, cancel); + var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed); + outgoing.invoke(__flushBatchRequests_name); + return completed.Task; } - public void end_flushBatchRequests(AsyncResult r) + public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null) { - IceInternal.ConnectionFlushBatch outAsync = - IceInternal.ConnectionFlushBatch.check(r, this, __flushBatchRequests_name); - outAsync.wait(); + var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance, + __flushBatchRequests_name, cookie, cb); + var outgoing = new ConnectionFlushBatchAsync(this, _instance, result); + outgoing.invoke(__flushBatchRequests_name); + return result; } - private const string __flushBatchRequests_name = "flushBatchRequests"; - - private AsyncResult begin_flushBatchRequestsInternal(AsyncCallback cb, object cookie) + public void end_flushBatchRequests(AsyncResult r) { - IceInternal.ConnectionFlushBatch result = - new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cookie); - if(cb != null) + if(r != null && r.getConnection() != this) { - result.whenCompletedWithAsyncCallback(cb); + const string msg = "Connection for call to end_" + __flushBatchRequests_name + + " does not match connection that was used to call corresponding begin_" + + __flushBatchRequests_name + " method"; + throw new ArgumentException(msg); } - result.invoke(); - return result; + AsyncResultI.check(r, __flushBatchRequests_name).wait(); } + private const string __flushBatchRequests_name = "flushBatchRequests"; + public void setCloseCallback(CloseCallback callback) { lock(this) @@ -595,40 +632,40 @@ namespace Ice return; // The request has already been or will be shortly notified of the failure. } - LinkedListNode<OutgoingMessage> p; - for(p = _sendStreams.First; p != null; p = p.Next) + + OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync); + if(o != null) { - OutgoingMessage o = p.Value; - if(o.outAsync == outAsync) + if(o.requestId > 0) { - if(o.requestId > 0) - { - _asyncRequests.Remove(o.requestId); - } + _asyncRequests.Remove(o.requestId); + } - if(ex is Ice.ConnectionTimeoutException) + if(ex is Ice.ConnectionTimeoutException) + { + setState(StateClosed, ex); + } + else + { + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.First.Value) { - setState(StateClosed, ex); + o.canceled(); } else { - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // o.canceled(); - if(o != _sendStreams.First.Value) - { - _sendStreams.Remove(p); - } - Ice.AsyncCallback cb = outAsync.completed(ex); - if(cb != null) - { - outAsync.invokeCompletedAsync(cb); - } + _sendStreams.Remove(o); + } + if(outAsync.exception(ex)) + { + outAsync.invokeExceptionAsync(); } - return; } + return; } if(outAsync is IceInternal.OutgoingAsync) @@ -644,10 +681,9 @@ namespace Ice else { _asyncRequests.Remove(kvp.Key); - Ice.AsyncCallback cb = outAsync.completed(ex); - if(cb != null) + if(outAsync.exception(ex)) { - outAsync.invokeCompletedAsync(cb); + outAsync.invokeExceptionAsync(); } } return; @@ -680,7 +716,7 @@ namespace Ice throw _exception; } - sendMessage(new OutgoingMessage(os, compressFlag != 0, true)); + sendMessage(new OutgoingMessage(os, compressFlag > 0, true)); if(_state == StateClosing && _dispatchCount == 0) { @@ -1226,17 +1262,16 @@ namespace Ice { foreach(OutgoingMessage m in sentCBs) { - if(m.sentCallback != null) + if(m.invokeSent) { - m.outAsync.invokeSent(m.sentCallback); + m.outAsync.invokeSent(); } if(m.receivedReply) { IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)m.outAsync; - Ice.AsyncCallback cb = outAsync.completed(); - if(cb != null) + if(outAsync.response()) { - outAsync.invokeCompleted(cb); + outAsync.invokeResponse(); } } } @@ -1249,7 +1284,7 @@ namespace Ice // if(info.outAsync != null) { - info.outAsync.invokeCompleted(info.completedCallback); + info.outAsync.invokeResponse(); ++dispatchedCount; } @@ -1333,7 +1368,7 @@ namespace Ice // unecessary thread creation, especially if this is called on shutdown). // if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && - _closeCallback == null && _heartbeatCallback == null) + _closeCallback == null && _heartbeatCallback == null) { finish(); return; @@ -1415,40 +1450,38 @@ namespace Ice // if(message.isSent || message.receivedReply) { - if(message.sent() && message.sentCallback != null) + if(message.sent() && message.invokeSent) { - message.outAsync.invokeSent(message.sentCallback); + message.outAsync.invokeSent(); } if(message.receivedReply) { IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)message.outAsync; - Ice.AsyncCallback cb = outAsync.completed(); - if(cb != null) + if(outAsync.response()) { - outAsync.invokeCompleted(cb); + outAsync.invokeResponse(); } } _sendStreams.RemoveFirst(); } } - foreach(OutgoingMessage m in _sendStreams) + foreach (OutgoingMessage o in _sendStreams) { - m.completed(_exception); - if(m.requestId > 0) // Make sure finished isn't called twice. + o.completed(_exception); + if(o.requestId > 0) // Make sure finished isn't called twice. { - _asyncRequests.Remove(m.requestId); + _asyncRequests.Remove(o.requestId); } } - _sendStreams.Clear(); + _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } foreach(IceInternal.OutgoingAsyncBase o in _asyncRequests.Values) { - Ice.AsyncCallback cb = o.completed(_exception); - if(cb != null) + if(o.exception(_exception)) { - o.invokeCompleted(cb); + o.invokeException(); } } _asyncRequests.Clear(); @@ -1919,7 +1952,8 @@ namespace Ice os.writeByte(_compressionSupported ? (byte)1 : (byte)0); os.writeInt(IceInternal.Protocol.headerSize); // Message size. - if(sendMessage(new OutgoingMessage(os, false, false))) + if((sendMessage(new OutgoingMessage(os, false, false)) & + IceInternal.OutgoingAsyncBase.AsyncStatusSent) != 0) { setState(StateClosingPending); @@ -1951,8 +1985,7 @@ namespace Ice os.writeInt(IceInternal.Protocol.headerSize); // Message size. try { - OutgoingMessage message = new OutgoingMessage(os, false, false); - sendMessage(message); + sendMessage(new OutgoingMessage(os, false, false)); } catch(Ice.LocalException ex) { @@ -2238,7 +2271,7 @@ namespace Ice return IceInternal.SocketOperation.None; } - private bool sendMessage(OutgoingMessage message) + private int sendMessage(OutgoingMessage message) { Debug.Assert(_state < StateClosed); @@ -2246,7 +2279,7 @@ namespace Ice { message.adopt(); _sendStreams.AddLast(message); - return false; + return IceInternal.OutgoingAsyncBase.AsyncStatusQueued; } // @@ -2287,13 +2320,17 @@ namespace Ice observerFinishWrite(message.stream.getBuffer()); } - message.sent(); + int status = IceInternal.OutgoingAsyncBase.AsyncStatusSent; + if(message.sent()) + { + status = status | IceInternal.OutgoingAsyncBase.AsyncStatusInvokeSentCallback; + } if(_acmLastActivity > -1) { _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } - return true; + return status; } message.adopt(); @@ -2302,7 +2339,7 @@ namespace Ice _sendStreams.AddLast(message); scheduleTimeout(op); _threadPool.register(this, op); - return false; + return IceInternal.OutgoingAsyncBase.AsyncStatusQueued; } private OutputStream doCompress(OutputStream uncompressed, bool compress) @@ -2365,10 +2402,9 @@ namespace Ice public int invokeNum; public int requestId; public byte compress; - public IceInternal.ServantManager servantManager; + public ServantManager servantManager; public ObjectAdapter adapter; - public IceInternal.OutgoingAsyncBase outAsync; - public Ice.AsyncCallback completedCallback; + public OutgoingAsyncBase outAsync; public HeartbeatCallback heartbeatCallback; public int messageDispatchCount; } @@ -2492,16 +2528,15 @@ namespace Ice break; } - case IceInternal.Protocol.replyMsg: + case Protocol.replyMsg: { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - IceInternal.OutgoingAsyncBase outAsync = null; - if(_asyncRequests.TryGetValue(info.requestId, out outAsync)) + if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync)) { _asyncRequests.Remove(info.requestId); - outAsync.getIs().swap(info.stream); + info.outAsync.getIs().swap(info.stream); // // If we just received the reply for a request which isn't acknowledge as @@ -2509,20 +2544,19 @@ namespace Ice // will be processed once the write callback is invoked for the message. // OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null; - if(message != null && message.outAsync == outAsync) + if(message != null && message.outAsync == info.outAsync) { message.receivedReply = true; } + else if(info.outAsync.response()) + { + ++info.messageDispatchCount; + } else { - info.completedCallback = outAsync.completed(); - if(info.completedCallback != null) - { - info.outAsync = outAsync; - ++info.messageDispatchCount; - } + info.outAsync = null; } - System.Threading.Monitor.PulseAll(this); // Notify threads blocked in close(false) + Monitor.PulseAll(this); // Notify threads blocked in close(false) } break; } @@ -2877,23 +2911,20 @@ namespace Ice this.stream = stream; this.compress = compress; this._adopt = adopt; - this.isSent = false; - this.requestId = 0; } internal OutgoingMessage(IceInternal.OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId) { + this.outAsync = outAsync; this.stream = stream; this.compress = compress; - this.outAsync = outAsync; this.requestId = requestId; - this.isSent = false; } internal void canceled() { - Debug.Assert(outAsync != null); + Debug.Assert(outAsync != null); // Only requests can timeout. outAsync = null; } @@ -2910,34 +2941,36 @@ namespace Ice internal bool sent() { + stream = null; if(outAsync != null) { - sentCallback = outAsync.sent(); + invokeSent = outAsync.sent(); + return invokeSent ||receivedReply; } - return sentCallback != null || receivedReply; + return false; } internal void completed(LocalException ex) { if(outAsync != null) { - Ice.AsyncCallback cb = outAsync.completed(ex); - if(cb != null) + if(outAsync.exception(ex)) { - outAsync.invokeCompleted(cb); + outAsync.invokeException(); } } + stream = null; } - internal OutputStream stream; + internal Ice.OutputStream stream; internal IceInternal.OutgoingAsyncBase outAsync; - internal bool receivedReply; internal bool compress; internal int requestId; internal bool _adopt; internal bool prepared; internal bool isSent; - internal Ice.AsyncCallback sentCallback = null; + internal bool invokeSent; + internal bool receivedReply; } private Communicator _communicator; |