summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/ConnectionI.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/ConnectionI.cs')
-rw-r--r--csharp/src/Ice/ConnectionI.cs293
1 files changed, 163 insertions, 130 deletions
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs
index ba5668eac74..252909df781 100644
--- a/csharp/src/Ice/ConnectionI.cs
+++ b/csharp/src/Ice/ConnectionI.cs
@@ -10,15 +10,17 @@
namespace Ice
{
using System;
- using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
- using Ice.Instrumentation;
+ using System.Threading.Tasks;
+ using System.Linq;
- public sealed class ConnectionI :
- IceInternal.EventHandler, IceInternal.ResponseHandler, IceInternal.CancellationHandler, Connection
+ using Instrumentation;
+ using IceInternal;
+
+ public sealed class ConnectionI : IceInternal.EventHandler, ResponseHandler, CancellationHandler, Connection
{
public interface StartCallback
{
@@ -26,7 +28,7 @@ namespace Ice
void connectionStartFailed(ConnectionI connection, LocalException ex);
}
- private class TimeoutCallback : IceInternal.TimerTask
+ private class TimeoutCallback : TimerTask
{
public TimeoutCallback(ConnectionI connection)
{
@@ -56,7 +58,7 @@ namespace Ice
throw _exception;
}
- if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ if(!initialize(SocketOperation.None) || !validate(SocketOperation.None))
{
_startCallback = callback;
return;
@@ -93,7 +95,7 @@ namespace Ice
throw _exception;
}
- if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ if(!initialize(SocketOperation.None) || !validate(SocketOperation.None))
{
//
// Wait for the connection to be validated.
@@ -135,7 +137,7 @@ namespace Ice
if(_acmLastActivity > -1)
{
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ _acmLastActivity = Time.currentMonotonicTimeMillis();
}
setState(StateActive);
}
@@ -315,7 +317,7 @@ namespace Ice
}
}
- public void monitor(long now, IceInternal.ACMConfig acm)
+ public void monitor(long now, ACMConfig acm)
{
lock(this)
{
@@ -347,7 +349,7 @@ namespace Ice
}
}
- if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty())
+ if(_readStream.size() > Protocol.headerSize || !_writeStream.isEmpty())
{
//
// If writing or reading, nothing to do, the connection
@@ -382,21 +384,21 @@ namespace Ice
}
}
- public bool sendAsyncRequest(IceInternal.OutgoingAsyncBase og, bool compress, bool response,
- int batchRequestNum, out Ice.AsyncCallback sentCallback)
+ public int sendAsyncRequest(OutgoingAsyncBase og, bool compress, bool response,
+ int batchRequestNum)
{
OutputStream os = og.getOs();
lock(this)
{
+ //
+ // If the exception is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
if(_exception != null)
{
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw new IceInternal.RetryException(_exception);
+ throw new RetryException(_exception);
}
Debug.Assert(_state > StateNotValidated);
@@ -413,7 +415,6 @@ namespace Ice
// This will throw if the request is canceled.
//
og.cancelable(this);
-
int requestId = 0;
if(response)
{
@@ -430,25 +431,24 @@ namespace Ice
//
// Fill in the request ID.
//
- os.pos(IceInternal.Protocol.headerSize);
+ os.pos(Protocol.headerSize);
os.writeInt(requestId);
}
else if(batchRequestNum > 0)
{
- os.pos(IceInternal.Protocol.headerSize);
+ os.pos(Protocol.headerSize);
os.writeInt(batchRequestNum);
}
og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
- bool sent;
+ int status = OutgoingAsyncBase.AsyncStatusQueued;
try
{
- OutgoingMessage msg = new OutgoingMessage(og, os, compress, requestId);
- sent = sendMessage(msg);
- sentCallback = msg.sentCallback;
+ OutgoingMessage message = new OutgoingMessage(og, os, compress, requestId);
+ status = sendMessage(message);
}
- catch(LocalException ex)
+ catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
Debug.Assert(_exception != null);
@@ -462,51 +462,88 @@ namespace Ice
//
_asyncRequests[requestId] = og;
}
- return sent;
+ return status;
}
}
- public IceInternal.BatchRequestQueue getBatchRequestQueue()
+ public BatchRequestQueue getBatchRequestQueue()
{
return _batchRequestQueue;
}
public void flushBatchRequests()
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ flushBatchRequestsAsync().Wait();
}
- public AsyncResult begin_flushBatchRequests()
+ private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback
{
- return begin_flushBatchRequestsInternal(null, null);
+ public ConnectionFlushBatchCompletionCallback(Ice.Connection connection,
+ Ice.Communicator communicator,
+ Instance instance,
+ string op,
+ object cookie,
+ Ice.AsyncCallback callback)
+ : base(communicator, instance, op, cookie, callback)
+ {
+ _connection = connection;
+ }
+
+ public override Ice.Connection getConnection()
+ {
+ return _connection;
+ }
+
+ protected override Ice.AsyncCallback getCompletedCallback()
+ {
+ return (Ice.AsyncResult result) =>
+ {
+ try
+ {
+ result.throwLocalException();
+ }
+ catch(Ice.Exception ex)
+ {
+ exceptionCallback_?.Invoke(ex);
+ }
+ };
+ }
+
+ private Ice.Connection _connection;
}
- public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie)
+ public Task flushBatchRequestsAsync(IProgress<bool> progress = null,
+ CancellationToken cancel = new CancellationToken())
{
- return begin_flushBatchRequestsInternal(cb, cookie);
+ var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
+ var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
+ outgoing.invoke(__flushBatchRequests_name);
+ return completed.Task;
}
- public void end_flushBatchRequests(AsyncResult r)
+ public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null)
{
- IceInternal.ConnectionFlushBatch outAsync =
- IceInternal.ConnectionFlushBatch.check(r, this, __flushBatchRequests_name);
- outAsync.wait();
+ var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance,
+ __flushBatchRequests_name, cookie, cb);
+ var outgoing = new ConnectionFlushBatchAsync(this, _instance, result);
+ outgoing.invoke(__flushBatchRequests_name);
+ return result;
}
- private const string __flushBatchRequests_name = "flushBatchRequests";
-
- private AsyncResult begin_flushBatchRequestsInternal(AsyncCallback cb, object cookie)
+ public void end_flushBatchRequests(AsyncResult r)
{
- IceInternal.ConnectionFlushBatch result =
- new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cookie);
- if(cb != null)
+ if(r != null && r.getConnection() != this)
{
- result.whenCompletedWithAsyncCallback(cb);
+ const string msg = "Connection for call to end_" + __flushBatchRequests_name +
+ " does not match connection that was used to call corresponding begin_" +
+ __flushBatchRequests_name + " method";
+ throw new ArgumentException(msg);
}
- result.invoke();
- return result;
+ AsyncResultI.check(r, __flushBatchRequests_name).wait();
}
+ private const string __flushBatchRequests_name = "flushBatchRequests";
+
public void setCloseCallback(CloseCallback callback)
{
lock(this)
@@ -595,40 +632,40 @@ namespace Ice
return; // The request has already been or will be shortly notified of the failure.
}
- LinkedListNode<OutgoingMessage> p;
- for(p = _sendStreams.First; p != null; p = p.Next)
+
+ OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
+ if(o != null)
{
- OutgoingMessage o = p.Value;
- if(o.outAsync == outAsync)
+ if(o.requestId > 0)
{
- if(o.requestId > 0)
- {
- _asyncRequests.Remove(o.requestId);
- }
+ _asyncRequests.Remove(o.requestId);
+ }
- if(ex is Ice.ConnectionTimeoutException)
+ if(ex is Ice.ConnectionTimeoutException)
+ {
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.First.Value)
{
- setState(StateClosed, ex);
+ o.canceled();
}
else
{
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
o.canceled();
- if(o != _sendStreams.First.Value)
- {
- _sendStreams.Remove(p);
- }
- Ice.AsyncCallback cb = outAsync.completed(ex);
- if(cb != null)
- {
- outAsync.invokeCompletedAsync(cb);
- }
+ _sendStreams.Remove(o);
+ }
+ if(outAsync.exception(ex))
+ {
+ outAsync.invokeExceptionAsync();
}
- return;
}
+ return;
}
if(outAsync is IceInternal.OutgoingAsync)
@@ -644,10 +681,9 @@ namespace Ice
else
{
_asyncRequests.Remove(kvp.Key);
- Ice.AsyncCallback cb = outAsync.completed(ex);
- if(cb != null)
+ if(outAsync.exception(ex))
{
- outAsync.invokeCompletedAsync(cb);
+ outAsync.invokeExceptionAsync();
}
}
return;
@@ -680,7 +716,7 @@ namespace Ice
throw _exception;
}
- sendMessage(new OutgoingMessage(os, compressFlag != 0, true));
+ sendMessage(new OutgoingMessage(os, compressFlag > 0, true));
if(_state == StateClosing && _dispatchCount == 0)
{
@@ -1226,17 +1262,16 @@ namespace Ice
{
foreach(OutgoingMessage m in sentCBs)
{
- if(m.sentCallback != null)
+ if(m.invokeSent)
{
- m.outAsync.invokeSent(m.sentCallback);
+ m.outAsync.invokeSent();
}
if(m.receivedReply)
{
IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)m.outAsync;
- Ice.AsyncCallback cb = outAsync.completed();
- if(cb != null)
+ if(outAsync.response())
{
- outAsync.invokeCompleted(cb);
+ outAsync.invokeResponse();
}
}
}
@@ -1249,7 +1284,7 @@ namespace Ice
//
if(info.outAsync != null)
{
- info.outAsync.invokeCompleted(info.completedCallback);
+ info.outAsync.invokeResponse();
++dispatchedCount;
}
@@ -1333,7 +1368,7 @@ namespace Ice
// unecessary thread creation, especially if this is called on shutdown).
//
if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 &&
- _closeCallback == null && _heartbeatCallback == null)
+ _closeCallback == null && _heartbeatCallback == null)
{
finish();
return;
@@ -1415,40 +1450,38 @@ namespace Ice
//
if(message.isSent || message.receivedReply)
{
- if(message.sent() && message.sentCallback != null)
+ if(message.sent() && message.invokeSent)
{
- message.outAsync.invokeSent(message.sentCallback);
+ message.outAsync.invokeSent();
}
if(message.receivedReply)
{
IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)message.outAsync;
- Ice.AsyncCallback cb = outAsync.completed();
- if(cb != null)
+ if(outAsync.response())
{
- outAsync.invokeCompleted(cb);
+ outAsync.invokeResponse();
}
}
_sendStreams.RemoveFirst();
}
}
- foreach(OutgoingMessage m in _sendStreams)
+ foreach (OutgoingMessage o in _sendStreams)
{
- m.completed(_exception);
- if(m.requestId > 0) // Make sure finished isn't called twice.
+ o.completed(_exception);
+ if(o.requestId > 0) // Make sure finished isn't called twice.
{
- _asyncRequests.Remove(m.requestId);
+ _asyncRequests.Remove(o.requestId);
}
}
- _sendStreams.Clear();
+ _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
}
foreach(IceInternal.OutgoingAsyncBase o in _asyncRequests.Values)
{
- Ice.AsyncCallback cb = o.completed(_exception);
- if(cb != null)
+ if(o.exception(_exception))
{
- o.invokeCompleted(cb);
+ o.invokeException();
}
}
_asyncRequests.Clear();
@@ -1919,7 +1952,8 @@ namespace Ice
os.writeByte(_compressionSupported ? (byte)1 : (byte)0);
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
- if(sendMessage(new OutgoingMessage(os, false, false)))
+ if((sendMessage(new OutgoingMessage(os, false, false)) &
+ IceInternal.OutgoingAsyncBase.AsyncStatusSent) != 0)
{
setState(StateClosingPending);
@@ -1951,8 +1985,7 @@ namespace Ice
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
try
{
- OutgoingMessage message = new OutgoingMessage(os, false, false);
- sendMessage(message);
+ sendMessage(new OutgoingMessage(os, false, false));
}
catch(Ice.LocalException ex)
{
@@ -2238,7 +2271,7 @@ namespace Ice
return IceInternal.SocketOperation.None;
}
- private bool sendMessage(OutgoingMessage message)
+ private int sendMessage(OutgoingMessage message)
{
Debug.Assert(_state < StateClosed);
@@ -2246,7 +2279,7 @@ namespace Ice
{
message.adopt();
_sendStreams.AddLast(message);
- return false;
+ return IceInternal.OutgoingAsyncBase.AsyncStatusQueued;
}
//
@@ -2287,13 +2320,17 @@ namespace Ice
observerFinishWrite(message.stream.getBuffer());
}
- message.sent();
+ int status = IceInternal.OutgoingAsyncBase.AsyncStatusSent;
+ if(message.sent())
+ {
+ status = status | IceInternal.OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
+ }
if(_acmLastActivity > -1)
{
_acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
- return true;
+ return status;
}
message.adopt();
@@ -2302,7 +2339,7 @@ namespace Ice
_sendStreams.AddLast(message);
scheduleTimeout(op);
_threadPool.register(this, op);
- return false;
+ return IceInternal.OutgoingAsyncBase.AsyncStatusQueued;
}
private OutputStream doCompress(OutputStream uncompressed, bool compress)
@@ -2365,10 +2402,9 @@ namespace Ice
public int invokeNum;
public int requestId;
public byte compress;
- public IceInternal.ServantManager servantManager;
+ public ServantManager servantManager;
public ObjectAdapter adapter;
- public IceInternal.OutgoingAsyncBase outAsync;
- public Ice.AsyncCallback completedCallback;
+ public OutgoingAsyncBase outAsync;
public HeartbeatCallback heartbeatCallback;
public int messageDispatchCount;
}
@@ -2492,16 +2528,15 @@ namespace Ice
break;
}
- case IceInternal.Protocol.replyMsg:
+ case Protocol.replyMsg:
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- IceInternal.OutgoingAsyncBase outAsync = null;
- if(_asyncRequests.TryGetValue(info.requestId, out outAsync))
+ if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
{
_asyncRequests.Remove(info.requestId);
- outAsync.getIs().swap(info.stream);
+ info.outAsync.getIs().swap(info.stream);
//
// If we just received the reply for a request which isn't acknowledge as
@@ -2509,20 +2544,19 @@ namespace Ice
// will be processed once the write callback is invoked for the message.
//
OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
- if(message != null && message.outAsync == outAsync)
+ if(message != null && message.outAsync == info.outAsync)
{
message.receivedReply = true;
}
+ else if(info.outAsync.response())
+ {
+ ++info.messageDispatchCount;
+ }
else
{
- info.completedCallback = outAsync.completed();
- if(info.completedCallback != null)
- {
- info.outAsync = outAsync;
- ++info.messageDispatchCount;
- }
+ info.outAsync = null;
}
- System.Threading.Monitor.PulseAll(this); // Notify threads blocked in close(false)
+ Monitor.PulseAll(this); // Notify threads blocked in close(false)
}
break;
}
@@ -2877,23 +2911,20 @@ namespace Ice
this.stream = stream;
this.compress = compress;
this._adopt = adopt;
- this.isSent = false;
- this.requestId = 0;
}
internal OutgoingMessage(IceInternal.OutgoingAsyncBase outAsync, OutputStream stream,
bool compress, int requestId)
{
+ this.outAsync = outAsync;
this.stream = stream;
this.compress = compress;
- this.outAsync = outAsync;
this.requestId = requestId;
- this.isSent = false;
}
internal void canceled()
{
- Debug.Assert(outAsync != null);
+ Debug.Assert(outAsync != null); // Only requests can timeout.
outAsync = null;
}
@@ -2910,34 +2941,36 @@ namespace Ice
internal bool sent()
{
+ stream = null;
if(outAsync != null)
{
- sentCallback = outAsync.sent();
+ invokeSent = outAsync.sent();
+ return invokeSent ||receivedReply;
}
- return sentCallback != null || receivedReply;
+ return false;
}
internal void completed(LocalException ex)
{
if(outAsync != null)
{
- Ice.AsyncCallback cb = outAsync.completed(ex);
- if(cb != null)
+ if(outAsync.exception(ex))
{
- outAsync.invokeCompleted(cb);
+ outAsync.invokeException();
}
}
+ stream = null;
}
- internal OutputStream stream;
+ internal Ice.OutputStream stream;
internal IceInternal.OutgoingAsyncBase outAsync;
- internal bool receivedReply;
internal bool compress;
internal int requestId;
internal bool _adopt;
internal bool prepared;
internal bool isSent;
- internal Ice.AsyncCallback sentCallback = null;
+ internal bool invokeSent;
+ internal bool receivedReply;
}
private Communicator _communicator;