summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/ConnectionI.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/ConnectionI.cs')
-rw-r--r--csharp/src/Ice/ConnectionI.cs186
1 files changed, 165 insertions, 21 deletions
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs
index 8943a4d48bc..c851279b0db 100644
--- a/csharp/src/Ice/ConnectionI.cs
+++ b/csharp/src/Ice/ConnectionI.cs
@@ -181,29 +181,31 @@ namespace Ice
}
}
- public void close(bool force)
+ public void close(ConnectionClose mode)
{
lock(this)
{
- if(force)
+ if(mode == ConnectionClose.CloseForcefully)
{
- setState(StateClosed, new ForcedCloseConnectionException());
+ setState(StateClosed, new ConnectionManuallyClosedException(false));
+ }
+ else if(mode == ConnectionClose.CloseGracefully)
+ {
+ setState(StateClosing, new ConnectionManuallyClosedException(true));
}
else
{
+ Debug.Assert(mode == ConnectionClose.CloseGracefullyAndWait);
+
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise,
- // the CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the
- // server has processed them or not.
+ // Wait until all outstanding requests have been completed.
//
while(_asyncRequests.Count != 0)
{
Monitor.Wait(this);
}
- setState(StateClosing, new CloseConnectionException());
+ setState(StateClosing, new ConnectionManuallyClosedException(true));
}
}
}
@@ -330,13 +332,13 @@ namespace Ice
// We send a heartbeat if there was no activity in the last
// (timeout / 4) period. Sending a heartbeat sooner than
// really needed is safer to ensure that the receiver will
- // receive in time the heartbeat. Sending the heartbeat if
+ // receive the heartbeat in time. Sending the heartbeat if
// there was no activity in the last (timeout / 2) period
// isn't enough since monitor() is called only every (timeout
// / 2) period.
//
// Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
+ // per timeout period because the monitor() method is still only
// called every (timeout / 2) period.
//
if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways ||
@@ -345,7 +347,7 @@ namespace Ice
{
if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0)
{
- heartbeat();
+ sendHeartbeatNow();
}
}
@@ -583,6 +585,149 @@ namespace Ice
}
}
+ public void heartbeat()
+ {
+ heartbeatAsync().Wait();
+ }
+
+ private class HeartbeatCompletionCallback : AsyncResultCompletionCallback
+ {
+ public HeartbeatCompletionCallback(Ice.Connection connection,
+ Ice.Communicator communicator,
+ Instance instance,
+ object cookie,
+ Ice.AsyncCallback callback)
+ : base(communicator, instance, "heartbeat", cookie, callback)
+ {
+ _connection = connection;
+ }
+
+ public override Ice.Connection getConnection()
+ {
+ return _connection;
+ }
+
+ protected override Ice.AsyncCallback getCompletedCallback()
+ {
+ return (Ice.AsyncResult result) =>
+ {
+ try
+ {
+ result.throwLocalException();
+ }
+ catch(Ice.Exception ex)
+ {
+ if(exceptionCallback_ != null)
+ {
+ exceptionCallback_.Invoke(ex);
+ }
+ }
+ };
+ }
+
+ private Ice.Connection _connection;
+ }
+
+ private class HeartbeatTaskCompletionCallback : TaskCompletionCallback<object>
+ {
+ public HeartbeatTaskCompletionCallback(System.IProgress<bool> progress,
+ CancellationToken cancellationToken) :
+ base(progress, cancellationToken)
+ {
+ }
+
+ public override bool handleResponse(bool ok, OutgoingAsyncBase og)
+ {
+ SetResult(null);
+ return false;
+ }
+ }
+
+ private class HeartbeatAsync : OutgoingAsyncBase
+ {
+ public HeartbeatAsync(Ice.ConnectionI connection,
+ Instance instance,
+ OutgoingAsyncCompletionCallback completionCallback) :
+ base(instance, completionCallback)
+ {
+ _connection = connection;
+ }
+
+ public void invoke()
+ {
+ try
+ {
+ os_.writeBlob(IceInternal.Protocol.magic);
+ Ice.Util.currentProtocol.write__(os_);
+ Ice.Util.currentProtocolEncoding.write__(os_);
+ os_.writeByte(IceInternal.Protocol.validateConnectionMsg);
+ os_.writeByte((byte)0);
+ os_.writeInt(IceInternal.Protocol.headerSize); // Message size.
+
+ int status = _connection.sendAsyncRequest(this, false, false, 0);
+
+ if((status & AsyncStatusSent) != 0)
+ {
+ sentSynchronously_ = true;
+ if((status & AsyncStatusInvokeSentCallback) != 0)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(RetryException ex)
+ {
+ try
+ {
+ throw ex.get();
+ }
+ catch(Ice.LocalException ee)
+ {
+ if(exception(ee))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ }
+
+ private readonly Ice.ConnectionI _connection;
+ }
+
+ public Task heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken())
+ {
+ var completed = new HeartbeatTaskCompletionCallback(progress, cancel);
+ var outgoing = new HeartbeatAsync(this, _instance, completed);
+ outgoing.invoke();
+ return completed.Task;
+ }
+
+ public AsyncResult begin_heartbeat(AsyncCallback cb = null, object cookie = null)
+ {
+ var result = new HeartbeatCompletionCallback(this, _communicator, _instance, cookie, cb);
+ var outgoing = new HeartbeatAsync(this, _instance, result);
+ outgoing.invoke();
+ return result;
+ }
+
+ public void end_heartbeat(AsyncResult r)
+ {
+ if(r != null && r.getConnection() != this)
+ {
+ const string msg = "Connection for call to end_heartbeat does not match connection that was used " +
+ "to call corresponding begin_heartbeat method";
+ throw new ArgumentException(msg);
+ }
+ AsyncResultI.check(r, "heartbeat").wait();
+ }
+
public void setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat)
{
lock(this)
@@ -1415,7 +1560,7 @@ namespace Ice
// Trace the cause of unexpected connection closures
//
if(!(_exception is CloseConnectionException ||
- _exception is ForcedCloseConnectionException ||
+ _exception is ConnectionManuallyClosedException ||
_exception is ConnectionTimeoutException ||
_exception is CommunicatorDestroyedException ||
_exception is ObjectAdapterDeactivatedException))
@@ -1723,7 +1868,7 @@ namespace Ice
// Don't warn about certain expected exceptions.
//
if(!(_exception is CloseConnectionException ||
- _exception is ForcedCloseConnectionException ||
+ _exception is ConnectionManuallyClosedException ||
_exception is ConnectionTimeoutException ||
_exception is CommunicatorDestroyedException ||
_exception is ObjectAdapterDeactivatedException ||
@@ -1902,7 +2047,7 @@ namespace Ice
if(_observer != null && state == StateClosed && _exception != null)
{
if(!(_exception is CloseConnectionException ||
- _exception is ForcedCloseConnectionException ||
+ _exception is ConnectionManuallyClosedException ||
_exception is ConnectionTimeoutException ||
_exception is CommunicatorDestroyedException ||
_exception is ObjectAdapterDeactivatedException ||
@@ -1931,8 +2076,7 @@ namespace Ice
private void initiateShutdown()
{
- Debug.Assert(_state == StateClosing);
- Debug.Assert(_dispatchCount == 0);
+ Debug.Assert(_state == StateClosing && _dispatchCount == 0);
if(_shutdownInitiated)
{
@@ -1958,7 +2102,7 @@ namespace Ice
setState(StateClosingPending);
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
int op = _transceiver.closing(true, _exception);
if(op != 0)
@@ -1970,7 +2114,7 @@ namespace Ice
}
}
- private void heartbeat()
+ private void sendHeartbeatNow()
{
Debug.Assert(_state == StateActive);
@@ -2456,7 +2600,7 @@ namespace Ice
setState(StateClosingPending, new CloseConnectionException());
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
int op = _transceiver.closing(false, _exception);
if(op != 0)
@@ -2540,7 +2684,7 @@ namespace Ice
{
info.outAsync = null;
}
- Monitor.PulseAll(this); // Notify threads blocked in close(false)
+ Monitor.PulseAll(this); // Notify threads blocked in close()
}
break;
}