diff options
Diffstat (limited to 'csharp/src/Ice/ConnectionI.cs')
-rw-r--r-- | csharp/src/Ice/ConnectionI.cs | 186 |
1 files changed, 165 insertions, 21 deletions
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 8943a4d48bc..c851279b0db 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -181,29 +181,31 @@ namespace Ice } } - public void close(bool force) + public void close(ConnectionClose mode) { lock(this) { - if(force) + if(mode == ConnectionClose.CloseForcefully) { - setState(StateClosed, new ForcedCloseConnectionException()); + setState(StateClosed, new ConnectionManuallyClosedException(false)); + } + else if(mode == ConnectionClose.CloseGracefully) + { + setState(StateClosing, new ConnectionManuallyClosedException(true)); } else { + Debug.Assert(mode == ConnectionClose.CloseGracefullyAndWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, - // the CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the - // server has processed them or not. + // Wait until all outstanding requests have been completed. // while(_asyncRequests.Count != 0) { Monitor.Wait(this); } - setState(StateClosing, new CloseConnectionException()); + setState(StateClosing, new ConnectionManuallyClosedException(true)); } } } @@ -330,13 +332,13 @@ namespace Ice // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than // really needed is safer to ensure that the receiver will - // receive in time the heartbeat. Sending the heartbeat if + // receive the heartbeat in time. Sending the heartbeat if // there was no activity in the last (timeout / 2) period // isn't enough since monitor() is called only every (timeout // / 2) period. // // Note that this doesn't imply that we are sending 4 heartbeats - // per timeout period because the monitor() method is sill only + // per timeout period because the monitor() method is still only // called every (timeout / 2) period. // if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways || @@ -345,7 +347,7 @@ namespace Ice { if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0) { - heartbeat(); + sendHeartbeatNow(); } } @@ -583,6 +585,149 @@ namespace Ice } } + public void heartbeat() + { + heartbeatAsync().Wait(); + } + + private class HeartbeatCompletionCallback : AsyncResultCompletionCallback + { + public HeartbeatCompletionCallback(Ice.Connection connection, + Ice.Communicator communicator, + Instance instance, + object cookie, + Ice.AsyncCallback callback) + : base(communicator, instance, "heartbeat", cookie, callback) + { + _connection = connection; + } + + public override Ice.Connection getConnection() + { + return _connection; + } + + protected override Ice.AsyncCallback getCompletedCallback() + { + return (Ice.AsyncResult result) => + { + try + { + result.throwLocalException(); + } + catch(Ice.Exception ex) + { + if(exceptionCallback_ != null) + { + exceptionCallback_.Invoke(ex); + } + } + }; + } + + private Ice.Connection _connection; + } + + private class HeartbeatTaskCompletionCallback : TaskCompletionCallback<object> + { + public HeartbeatTaskCompletionCallback(System.IProgress<bool> progress, + CancellationToken cancellationToken) : + base(progress, cancellationToken) + { + } + + public override bool handleResponse(bool ok, OutgoingAsyncBase og) + { + SetResult(null); + return false; + } + } + + private class HeartbeatAsync : OutgoingAsyncBase + { + public HeartbeatAsync(Ice.ConnectionI connection, + Instance instance, + OutgoingAsyncCompletionCallback completionCallback) : + base(instance, completionCallback) + { + _connection = connection; + } + + public void invoke() + { + try + { + os_.writeBlob(IceInternal.Protocol.magic); + Ice.Util.currentProtocol.write__(os_); + Ice.Util.currentProtocolEncoding.write__(os_); + os_.writeByte(IceInternal.Protocol.validateConnectionMsg); + os_.writeByte((byte)0); + os_.writeInt(IceInternal.Protocol.headerSize); // Message size. + + int status = _connection.sendAsyncRequest(this, false, false, 0); + + if((status & AsyncStatusSent) != 0) + { + sentSynchronously_ = true; + if((status & AsyncStatusInvokeSentCallback) != 0) + { + invokeSent(); + } + } + } + catch(RetryException ex) + { + try + { + throw ex.get(); + } + catch(Ice.LocalException ee) + { + if(exception(ee)) + { + invokeExceptionAsync(); + } + } + } + catch(Ice.Exception ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } + } + + private readonly Ice.ConnectionI _connection; + } + + public Task heartbeatAsync(IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) + { + var completed = new HeartbeatTaskCompletionCallback(progress, cancel); + var outgoing = new HeartbeatAsync(this, _instance, completed); + outgoing.invoke(); + return completed.Task; + } + + public AsyncResult begin_heartbeat(AsyncCallback cb = null, object cookie = null) + { + var result = new HeartbeatCompletionCallback(this, _communicator, _instance, cookie, cb); + var outgoing = new HeartbeatAsync(this, _instance, result); + outgoing.invoke(); + return result; + } + + public void end_heartbeat(AsyncResult r) + { + if(r != null && r.getConnection() != this) + { + const string msg = "Connection for call to end_heartbeat does not match connection that was used " + + "to call corresponding begin_heartbeat method"; + throw new ArgumentException(msg); + } + AsyncResultI.check(r, "heartbeat").wait(); + } + public void setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat) { lock(this) @@ -1415,7 +1560,7 @@ namespace Ice // Trace the cause of unexpected connection closures // if(!(_exception is CloseConnectionException || - _exception is ForcedCloseConnectionException || + _exception is ConnectionManuallyClosedException || _exception is ConnectionTimeoutException || _exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException)) @@ -1723,7 +1868,7 @@ namespace Ice // Don't warn about certain expected exceptions. // if(!(_exception is CloseConnectionException || - _exception is ForcedCloseConnectionException || + _exception is ConnectionManuallyClosedException || _exception is ConnectionTimeoutException || _exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException || @@ -1902,7 +2047,7 @@ namespace Ice if(_observer != null && state == StateClosed && _exception != null) { if(!(_exception is CloseConnectionException || - _exception is ForcedCloseConnectionException || + _exception is ConnectionManuallyClosedException || _exception is ConnectionTimeoutException || _exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException || @@ -1931,8 +2076,7 @@ namespace Ice private void initiateShutdown() { - Debug.Assert(_state == StateClosing); - Debug.Assert(_dispatchCount == 0); + Debug.Assert(_state == StateClosing && _dispatchCount == 0); if(_shutdownInitiated) { @@ -1958,7 +2102,7 @@ namespace Ice setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // int op = _transceiver.closing(true, _exception); if(op != 0) @@ -1970,7 +2114,7 @@ namespace Ice } } - private void heartbeat() + private void sendHeartbeatNow() { Debug.Assert(_state == StateActive); @@ -2456,7 +2600,7 @@ namespace Ice setState(StateClosingPending, new CloseConnectionException()); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // int op = _transceiver.closing(false, _exception); if(op != 0) @@ -2540,7 +2684,7 @@ namespace Ice { info.outAsync = null; } - Monitor.PulseAll(this); // Notify threads blocked in close(false) + Monitor.PulseAll(this); // Notify threads blocked in close() } break; } |