diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
commit | d81701ca8182942b7936f9fd84a019b695e9c890 (patch) | |
tree | dc036c9d701fbbe1afad67782bd78572c0f61974 /java/src/Ice/ConnectionI.java | |
parent | Fixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff) | |
download | ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2 ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip |
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 418 |
1 files changed, 341 insertions, 77 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 7785c2d2226..c001462c960 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -105,9 +105,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - if(_acmTimeout > 0) + if(_acmLastActivity > 0) { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } setState(StateActive); @@ -274,26 +274,66 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } synchronized public void - monitor(long now) + monitor(long now, IceInternal.ACMConfig acm) { if(_state != StateActive) { return; } - // - // Active connection management for idle connections. - // - if(_acmTimeout <= 0 || - !_requests.isEmpty() || !_asyncRequests.isEmpty() || _dispatchCount > 0 || - _readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty() || !_batchStream.isEmpty()) + if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty()) { + // + // If writing or reading, nothing to do, the connection + // timeout will kick-in if writes or reads don't progress. + // This check is necessary because the actitivy timer is + // only set when a message is fully read/written. + // return; } - if(now >= _acmAbsoluteTimeoutMillis) + // + // We send a heartbeat if there was no activity in the last + // (timeout / 4) period. Sending a heartbeat sooner than + // really needed is safer to ensure that the receiver will + // receive in time the heartbeat. Sending the heartbeat if + // there was no activity in the last (timeout / 2) period + // isn't enough since monitor() is called only every (timeout + // / 2) period. + // + // Note that this doesn't imply that we are sending 4 + // heartbeats per timeout period because the monitor() method + // is sill only called every (timeout / 2) period. + // + + if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways || + (acm.heartbeat != ACMHeartbeat.HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4))) { - setState(StateClosing, new ConnectionTimeoutException()); + if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0) + { + heartbeat(); + } + } + + if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout)) + { + if(acm.close == ACMClose.CloseOnIdleForceful || + (acm.close != ACMClose.CloseOnIdle && (!_requests.isEmpty() || !_asyncRequests.isEmpty()))) + { + // + // Close the connection if we didn't receive a heartbeat in + // the last period. + // + setState(StateClosed, new ConnectionTimeoutException()); + } + else if(acm.close != ACMClose.CloseOnInvocation && + _dispatchCount == 0 && _batchStream.isEmpty() && _requests.isEmpty() && _asyncRequests.isEmpty()) + { + // + // The connection is idle, close it. + // + setState(StateClosing, new ConnectionTimeoutException()); + } } } @@ -701,11 +741,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne new IceInternal.ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb); try { - result.__send(); + result.__invoke(); } catch(LocalException __ex) { - result.__exceptionAsync(__ex); + result.__invokeExceptionAsync(__ex); } return result; } @@ -738,7 +778,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchRequestNum == 0) { - out.sent(false); + out.sent(); return true; } @@ -799,7 +839,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchRequestNum == 0) { int status = IceInternal.AsyncStatus.Sent; - if(outAsync.__sent(this)) + if(outAsync.__sent()) { status |= IceInternal.AsyncStatus.InvokeSentCallback; } @@ -841,6 +881,142 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return status; } + synchronized public void + setCallback(ConnectionCallback callback) + { + if(_state > StateClosing) + { + return; + } + _callback = callback; + } + + synchronized public void + setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat) + { + if(_monitor != null) + { + if(_state == StateActive) + { + _monitor.remove(this); + } + _monitor = _monitor.acm(timeout, close, heartbeat); + if(_state == StateActive) + { + _monitor.add(this); + } + + if(_monitor.getACM().timeout <= 0) + { + _acmLastActivity = -1; // Disable the recording of last activity. + } + else if(_state == StateActive && _acmLastActivity == -1) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } + } + } + + synchronized public Ice.ACM + getACM() + { + return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); + } + + synchronized public void + requestTimedOut(IceInternal.OutgoingMessageCallback out) + { + java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); + while(it.hasNext()) + { + OutgoingMessage o = it.next(); + if(o.out == out) + { + if(o.requestId > 0) + { + _requests.remove(o.requestId); + } + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.getFirst()) + { + o.timedOut(); + } + else + { + it.remove(); + } + o.finished(new InvocationTimeoutException()); + return; // We're done. + } + } + + if(out instanceof IceInternal.Outgoing) + { + IceInternal.Outgoing o = (IceInternal.Outgoing)out; + java.util.Iterator<IceInternal.Outgoing> it2 = _requests.values().iterator(); + while(it2.hasNext()) + { + if(it2.next() == o) + { + o.finished(new InvocationTimeoutException(), true); + it2.remove(); + return; // We're done. + } + } + } + } + + synchronized public void + asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync) + { + java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); + while(it.hasNext()) + { + OutgoingMessage o = it.next(); + if(o.outAsync == outAsync) + { + if(o.requestId > 0) + { + _asyncRequests.remove(o.requestId); + } + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.getFirst()) + { + o.timedOut(); + } + else + { + it.remove(); + } + o.finished(new InvocationTimeoutException()); + return; // We're done. + } + } + + if(outAsync instanceof IceInternal.OutgoingAsync) + { + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; + java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); + while(it2.hasNext()) + { + if(it2.next() == o) + { + o.__finished(new InvocationTimeoutException(), true); + it2.remove(); + return; // We're done. + } + } + } + } + synchronized public void sendResponse(IceInternal.BasicStream os, byte compressFlag) { @@ -852,7 +1028,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this, _observer); + reap(); } notifyAll(); } @@ -886,7 +1062,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this, _observer); + reap(); } notifyAll(); } @@ -1119,8 +1295,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // We start out in holding state. // setState(StateHolding); - startCB = _startCallback; - _startCallback = null; + if(_startCallback != null) + { + startCB = _startCallback; + _startCallback = null; + if(startCB != null) + { + ++_dispatchCount; + } + } } else { @@ -1138,15 +1321,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if((current.operation & IceInternal.SocketOperation.Write) != 0) { sentCBs = sendNextMessage(); - } - - // - // We increment the dispatch count to prevent the - // communicator destruction during the callback. - // - if(sentCBs != null || info != null && info.outAsync != null) - { - ++_dispatchCount; + if(sentCBs != null) + { + ++_dispatchCount; + } } } } @@ -1186,17 +1364,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - if(_acmTimeout > 0) + if(_acmLastActivity > 0) { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } - current.ioCompleted(); } if(_dispatcher != null) { - if(info != null) + if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback { // // Create a new stream for the dispatch instead of using the thread @@ -1239,6 +1416,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) { + int count = 0; + // // Notify the factory that the connection establishment and // validation has completed. @@ -1246,6 +1425,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(startCB != null) { startCB.connectionStartCompleted(this); + ++count; } // @@ -1255,8 +1435,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { for(OutgoingMessage msg : sentCBs) { - msg.outAsync.__sent(); + msg.outAsync.__invokeSent(); } + ++count; } if(info != null) @@ -1268,6 +1449,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(info.outAsync != null) { info.outAsync.__finished(info.stream); + ++count; } if(info.invokeNum > 0) @@ -1279,17 +1461,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); + + // + // Don't increase count, the dispatch count is + // decreased when the incoming reply is sent. + // + } + + if(info.heartbeatCallback != null) + { + try + { + info.heartbeatCallback.heartbeat(this); + } + catch(Exception ex) + { + _logger.error("connection callback exception:\n" + ex + '\n' + _desc); + } + ++count; } } // // Decrease dispatch count. // - if(sentCBs != null || info != null && info.outAsync != null) + if(count > 0) { synchronized(this) { - if(--_dispatchCount == 0) + _dispatchCount -= count; + if(_dispatchCount == 0) { // // Only initiate shutdown if not already done. It @@ -1311,7 +1512,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else if(_state == StateFinished) { - _reaper.add(this, _observer); + reap(); } notifyAll(); } @@ -1328,7 +1529,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // finish() with the dispatcher if one is set, or we promote another IO // thread first before calling finish(). // - if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty()) + if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null) { finish(); return; @@ -1428,6 +1629,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } _asyncRequests.clear(); + if(_callback != null) + { + try + { + _callback.closed(this); + } + catch(Exception ex) + { + _logger.error("connection callback exception:\n" + ex + '\n' + _desc); + } + _callback = null; + } + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1437,7 +1651,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateFinished); if(_dispatchCount == 0) { - _reaper.add(this, _observer); + reap(); } } } @@ -1530,20 +1744,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this, _observer); + reap(); } notifyAll(); } } } - public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ConnectionReaper reaper, + public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor, IceInternal.Transceiver transceiver, IceInternal.Connector connector, IceInternal.EndpointI endpoint, ObjectAdapter adapter) { _communicator = communicator; _instance = instance; - _reaper = reaper; + _monitor = monitor; _transceiver = transceiver; _desc = transceiver.toString(); _type = transceiver.type(); @@ -1562,7 +1776,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0; _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; _cacheBuffers = instance.cacheMessageBuffers(); - _acmAbsoluteTimeoutMillis = 0; + if(_monitor != null && _monitor.getACM().timeout > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } + else + { + _acmLastActivity = -1; + } _nextRequestId = 1; _batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false; _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, @@ -1601,22 +1822,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne try { - if(_endpoint.datagram()) - { - _acmTimeout = 0; - } - else - { - if(_adapter != null) - { - _acmTimeout = ((ObjectAdapterI)_adapter).getACM(); - } - else - { - _acmTimeout = _instance.clientACM(); - } - } - if(_adapter != null) { _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); @@ -1841,15 +2046,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // monitor, but only if we were registered before, i.e., if our // old state was StateActive. // - if(_acmTimeout > 0) + if(_monitor != null) { if(state == StateActive) { - _instance.connectionMonitor().add(this); + _monitor.add(this); + if(_acmLastActivity > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } } else if(_state == StateActive) { - _instance.connectionMonitor().remove(this); + _monitor.remove(this); } } @@ -1949,6 +2158,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private void + heartbeat() + { + assert(_state == StateActive); + + if(!_endpoint.datagram()) + { + IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, + IceInternal.Protocol.currentProtocolEncoding); + os.writeBlob(IceInternal.Protocol.magic); + IceInternal.Protocol.currentProtocol.__write(os); + IceInternal.Protocol.currentProtocolEncoding.__write(os); + os.writeByte(IceInternal.Protocol.validateConnectionMsg); + os.writeByte((byte)0); + os.writeInt(IceInternal.Protocol.headerSize); // Message size. + + try + { + OutgoingMessage message = new OutgoingMessage(os, false, false); + sendMessage(message); + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + assert(_exception != null); + } + } + } + private boolean initialize(int operation) { @@ -2085,7 +2323,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // OutgoingMessage message = _sendStreams.getFirst(); _writeStream.swap(message.stream); - if(message.sent(this, true)) + if(message.sent()) { callbacks.add(message); } @@ -2217,13 +2455,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishWrite(message.stream.pos()); } int status = IceInternal.AsyncStatus.Sent; - if(message.sent(this, false)) + if(message.sent()) { status |= IceInternal.AsyncStatus.InvokeSentCallback; } - if(_acmTimeout > 0) + + if(_acmLastActivity > 0) { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } return status; } @@ -2307,6 +2546,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.ServantManager servantManager; ObjectAdapter adapter; IceInternal.OutgoingAsync outAsync; + ConnectionCallback heartbeatCallback; } private MessageInfo @@ -2432,9 +2672,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne else { info.outAsync = _asyncRequests.remove(info.requestId); - if(info.outAsync == null) + if(info.outAsync != null) { - throw new UnknownRequestIdException(); + ++_dispatchCount; } } notifyAll(); // Notify threads blocked in close(false) @@ -2444,17 +2684,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne case IceInternal.Protocol.validateConnectionMsg: { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); - if(_warn) + if(_callback != null) { - _logger.warning("ignoring unexpected validate connection message:\n" + _desc); + info.heartbeatCallback = _callback; + ++_dispatchCount; } break; } default: { - IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", - info.stream, _logger, _traceLevels); + IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream, + _logger, _traceLevels); throw new UnknownMessageException(); } } @@ -2737,6 +2978,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private void + reap() + { + if(_monitor != null) + { + _monitor.reap(this); + } + if(_observer != null) + { + _observer.detach(); + } + } + public IceInternal.Outgoing getOutgoing(IceInternal.RequestHandler handler, String operation, OperationMode mode, java.util.Map<String, String> context, InvocationObserver observer) @@ -2818,6 +3072,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.isSent = false; } + public void + timedOut() + { + assert((out != null || outAsync != null) && !isSent); + out = null; + outAsync = null; + } + public void adopt() { @@ -2832,18 +3094,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } public boolean - sent(ConnectionI connection, boolean notify) + sent() { isSent = true; // The message is sent. if(out != null) { - out.sent(notify); // true = notify the waiting thread that the request was sent. + out.sent(); return false; } else if(outAsync != null) { - return outAsync.__sent(connection); + return outAsync.__sent(); } else { @@ -2876,7 +3138,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Communicator _communicator; private final IceInternal.Instance _instance; - private final IceInternal.ConnectionReaper _reaper; + private IceInternal.ACMMonitor _monitor; private final IceInternal.Transceiver _transceiver; private String _desc; private final String _type; @@ -2902,8 +3164,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private final boolean _warn; private final boolean _warnUdp; - private final long _acmTimeout; - private long _acmAbsoluteTimeoutMillis; + + private long _acmLastActivity; private final int _compressionLevel; @@ -2952,6 +3214,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Ice.ConnectionInfo _info; + private ConnectionCallback _callback; + private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated |