summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
commitd81701ca8182942b7936f9fd84a019b695e9c890 (patch)
treedc036c9d701fbbe1afad67782bd78572c0f61974 /java/src/Ice/ConnectionI.java
parentFixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff)
downloadice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2
ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz
ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java418
1 files changed, 341 insertions, 77 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 7785c2d2226..c001462c960 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -105,9 +105,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
- if(_acmTimeout > 0)
+ if(_acmLastActivity > 0)
{
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
setState(StateActive);
@@ -274,26 +274,66 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
synchronized public void
- monitor(long now)
+ monitor(long now, IceInternal.ACMConfig acm)
{
if(_state != StateActive)
{
return;
}
- //
- // Active connection management for idle connections.
- //
- if(_acmTimeout <= 0 ||
- !_requests.isEmpty() || !_asyncRequests.isEmpty() || _dispatchCount > 0 ||
- _readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty() || !_batchStream.isEmpty())
+ if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty())
{
+ //
+ // If writing or reading, nothing to do, the connection
+ // timeout will kick-in if writes or reads don't progress.
+ // This check is necessary because the actitivy timer is
+ // only set when a message is fully read/written.
+ //
return;
}
- if(now >= _acmAbsoluteTimeoutMillis)
+ //
+ // We send a heartbeat if there was no activity in the last
+ // (timeout / 4) period. Sending a heartbeat sooner than
+ // really needed is safer to ensure that the receiver will
+ // receive in time the heartbeat. Sending the heartbeat if
+ // there was no activity in the last (timeout / 2) period
+ // isn't enough since monitor() is called only every (timeout
+ // / 2) period.
+ //
+ // Note that this doesn't imply that we are sending 4
+ // heartbeats per timeout period because the monitor() method
+ // is sill only called every (timeout / 2) period.
+ //
+
+ if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways ||
+ (acm.heartbeat != ACMHeartbeat.HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4)))
{
- setState(StateClosing, new ConnectionTimeoutException());
+ if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0)
+ {
+ heartbeat();
+ }
+ }
+
+ if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout))
+ {
+ if(acm.close == ACMClose.CloseOnIdleForceful ||
+ (acm.close != ACMClose.CloseOnIdle && (!_requests.isEmpty() || !_asyncRequests.isEmpty())))
+ {
+ //
+ // Close the connection if we didn't receive a heartbeat in
+ // the last period.
+ //
+ setState(StateClosed, new ConnectionTimeoutException());
+ }
+ else if(acm.close != ACMClose.CloseOnInvocation &&
+ _dispatchCount == 0 && _batchStream.isEmpty() && _requests.isEmpty() && _asyncRequests.isEmpty())
+ {
+ //
+ // The connection is idle, close it.
+ //
+ setState(StateClosing, new ConnectionTimeoutException());
+ }
}
}
@@ -701,11 +741,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
new IceInternal.ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb);
try
{
- result.__send();
+ result.__invoke();
}
catch(LocalException __ex)
{
- result.__exceptionAsync(__ex);
+ result.__invokeExceptionAsync(__ex);
}
return result;
}
@@ -738,7 +778,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_batchRequestNum == 0)
{
- out.sent(false);
+ out.sent();
return true;
}
@@ -799,7 +839,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_batchRequestNum == 0)
{
int status = IceInternal.AsyncStatus.Sent;
- if(outAsync.__sent(this))
+ if(outAsync.__sent())
{
status |= IceInternal.AsyncStatus.InvokeSentCallback;
}
@@ -841,6 +881,142 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return status;
}
+ synchronized public void
+ setCallback(ConnectionCallback callback)
+ {
+ if(_state > StateClosing)
+ {
+ return;
+ }
+ _callback = callback;
+ }
+
+ synchronized public void
+ setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat)
+ {
+ if(_monitor != null)
+ {
+ if(_state == StateActive)
+ {
+ _monitor.remove(this);
+ }
+ _monitor = _monitor.acm(timeout, close, heartbeat);
+ if(_state == StateActive)
+ {
+ _monitor.add(this);
+ }
+
+ if(_monitor.getACM().timeout <= 0)
+ {
+ _acmLastActivity = -1; // Disable the recording of last activity.
+ }
+ else if(_state == StateActive && _acmLastActivity == -1)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+ }
+ }
+
+ synchronized public Ice.ACM
+ getACM()
+ {
+ return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
+ }
+
+ synchronized public void
+ requestTimedOut(IceInternal.OutgoingMessageCallback out)
+ {
+ java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
+ while(it.hasNext())
+ {
+ OutgoingMessage o = it.next();
+ if(o.out == out)
+ {
+ if(o.requestId > 0)
+ {
+ _requests.remove(o.requestId);
+ }
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.getFirst())
+ {
+ o.timedOut();
+ }
+ else
+ {
+ it.remove();
+ }
+ o.finished(new InvocationTimeoutException());
+ return; // We're done.
+ }
+ }
+
+ if(out instanceof IceInternal.Outgoing)
+ {
+ IceInternal.Outgoing o = (IceInternal.Outgoing)out;
+ java.util.Iterator<IceInternal.Outgoing> it2 = _requests.values().iterator();
+ while(it2.hasNext())
+ {
+ if(it2.next() == o)
+ {
+ o.finished(new InvocationTimeoutException(), true);
+ it2.remove();
+ return; // We're done.
+ }
+ }
+ }
+ }
+
+ synchronized public void
+ asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync)
+ {
+ java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
+ while(it.hasNext())
+ {
+ OutgoingMessage o = it.next();
+ if(o.outAsync == outAsync)
+ {
+ if(o.requestId > 0)
+ {
+ _asyncRequests.remove(o.requestId);
+ }
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.getFirst())
+ {
+ o.timedOut();
+ }
+ else
+ {
+ it.remove();
+ }
+ o.finished(new InvocationTimeoutException());
+ return; // We're done.
+ }
+ }
+
+ if(outAsync instanceof IceInternal.OutgoingAsync)
+ {
+ IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync;
+ java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
+ while(it2.hasNext())
+ {
+ if(it2.next() == o)
+ {
+ o.__finished(new InvocationTimeoutException(), true);
+ it2.remove();
+ return; // We're done.
+ }
+ }
+ }
+ }
+
synchronized public void
sendResponse(IceInternal.BasicStream os, byte compressFlag)
{
@@ -852,7 +1028,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this, _observer);
+ reap();
}
notifyAll();
}
@@ -886,7 +1062,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this, _observer);
+ reap();
}
notifyAll();
}
@@ -1119,8 +1295,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// We start out in holding state.
//
setState(StateHolding);
- startCB = _startCallback;
- _startCallback = null;
+ if(_startCallback != null)
+ {
+ startCB = _startCallback;
+ _startCallback = null;
+ if(startCB != null)
+ {
+ ++_dispatchCount;
+ }
+ }
}
else
{
@@ -1138,15 +1321,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if((current.operation & IceInternal.SocketOperation.Write) != 0)
{
sentCBs = sendNextMessage();
- }
-
- //
- // We increment the dispatch count to prevent the
- // communicator destruction during the callback.
- //
- if(sentCBs != null || info != null && info.outAsync != null)
- {
- ++_dispatchCount;
+ if(sentCBs != null)
+ {
+ ++_dispatchCount;
+ }
}
}
}
@@ -1186,17 +1364,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
- if(_acmTimeout > 0)
+ if(_acmLastActivity > 0)
{
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
-
current.ioCompleted();
}
if(_dispatcher != null)
{
- if(info != null)
+ if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback
{
//
// Create a new stream for the dispatch instead of using the thread
@@ -1239,6 +1416,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
protected void
dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
{
+ int count = 0;
+
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1246,6 +1425,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(startCB != null)
{
startCB.connectionStartCompleted(this);
+ ++count;
}
//
@@ -1255,8 +1435,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
for(OutgoingMessage msg : sentCBs)
{
- msg.outAsync.__sent();
+ msg.outAsync.__invokeSent();
}
+ ++count;
}
if(info != null)
@@ -1268,6 +1449,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(info.outAsync != null)
{
info.outAsync.__finished(info.stream);
+ ++count;
}
if(info.invokeNum > 0)
@@ -1279,17 +1461,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
info.adapter);
+
+ //
+ // Don't increase count, the dispatch count is
+ // decreased when the incoming reply is sent.
+ //
+ }
+
+ if(info.heartbeatCallback != null)
+ {
+ try
+ {
+ info.heartbeatCallback.heartbeat(this);
+ }
+ catch(Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ ++count;
}
}
//
// Decrease dispatch count.
//
- if(sentCBs != null || info != null && info.outAsync != null)
+ if(count > 0)
{
synchronized(this)
{
- if(--_dispatchCount == 0)
+ _dispatchCount -= count;
+ if(_dispatchCount == 0)
{
//
// Only initiate shutdown if not already done. It
@@ -1311,7 +1512,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else if(_state == StateFinished)
{
- _reaper.add(this, _observer);
+ reap();
}
notifyAll();
}
@@ -1328,7 +1529,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// finish() with the dispatcher if one is set, or we promote another IO
// thread first before calling finish().
//
- if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty())
+ if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null)
{
finish();
return;
@@ -1428,6 +1629,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
_asyncRequests.clear();
+ if(_callback != null)
+ {
+ try
+ {
+ _callback.closed(this);
+ }
+ catch(Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ _callback = null;
+ }
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -1437,7 +1651,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateFinished);
if(_dispatchCount == 0)
{
- _reaper.add(this, _observer);
+ reap();
}
}
}
@@ -1530,20 +1744,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this, _observer);
+ reap();
}
notifyAll();
}
}
}
- public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ConnectionReaper reaper,
+ public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
IceInternal.Transceiver transceiver, IceInternal.Connector connector,
IceInternal.EndpointI endpoint, ObjectAdapter adapter)
{
_communicator = communicator;
_instance = instance;
- _reaper = reaper;
+ _monitor = monitor;
_transceiver = transceiver;
_desc = transceiver.toString();
_type = transceiver.type();
@@ -1562,7 +1776,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
_warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
_cacheBuffers = instance.cacheMessageBuffers();
- _acmAbsoluteTimeoutMillis = 0;
+ if(_monitor != null && _monitor.getACM().timeout > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+ else
+ {
+ _acmLastActivity = -1;
+ }
_nextRequestId = 1;
_batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
_batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding,
@@ -1601,22 +1822,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
try
{
- if(_endpoint.datagram())
- {
- _acmTimeout = 0;
- }
- else
- {
- if(_adapter != null)
- {
- _acmTimeout = ((ObjectAdapterI)_adapter).getACM();
- }
- else
- {
- _acmTimeout = _instance.clientACM();
- }
- }
-
if(_adapter != null)
{
_threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
@@ -1841,15 +2046,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// monitor, but only if we were registered before, i.e., if our
// old state was StateActive.
//
- if(_acmTimeout > 0)
+ if(_monitor != null)
{
if(state == StateActive)
{
- _instance.connectionMonitor().add(this);
+ _monitor.add(this);
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
}
else if(_state == StateActive)
{
- _instance.connectionMonitor().remove(this);
+ _monitor.remove(this);
}
}
@@ -1949,6 +2158,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private void
+ heartbeat()
+ {
+ assert(_state == StateActive);
+
+ if(!_endpoint.datagram())
+ {
+ IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
+ IceInternal.Protocol.currentProtocolEncoding);
+ os.writeBlob(IceInternal.Protocol.magic);
+ IceInternal.Protocol.currentProtocol.__write(os);
+ IceInternal.Protocol.currentProtocolEncoding.__write(os);
+ os.writeByte(IceInternal.Protocol.validateConnectionMsg);
+ os.writeByte((byte)0);
+ os.writeInt(IceInternal.Protocol.headerSize); // Message size.
+
+ try
+ {
+ OutgoingMessage message = new OutgoingMessage(os, false, false);
+ sendMessage(message);
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+ }
+ }
+ }
+
private boolean
initialize(int operation)
{
@@ -2085,7 +2323,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
OutgoingMessage message = _sendStreams.getFirst();
_writeStream.swap(message.stream);
- if(message.sent(this, true))
+ if(message.sent())
{
callbacks.add(message);
}
@@ -2217,13 +2455,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishWrite(message.stream.pos());
}
int status = IceInternal.AsyncStatus.Sent;
- if(message.sent(this, false))
+ if(message.sent())
{
status |= IceInternal.AsyncStatus.InvokeSentCallback;
}
- if(_acmTimeout > 0)
+
+ if(_acmLastActivity > 0)
{
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
return status;
}
@@ -2307,6 +2546,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.ServantManager servantManager;
ObjectAdapter adapter;
IceInternal.OutgoingAsync outAsync;
+ ConnectionCallback heartbeatCallback;
}
private MessageInfo
@@ -2432,9 +2672,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
else
{
info.outAsync = _asyncRequests.remove(info.requestId);
- if(info.outAsync == null)
+ if(info.outAsync != null)
{
- throw new UnknownRequestIdException();
+ ++_dispatchCount;
}
}
notifyAll(); // Notify threads blocked in close(false)
@@ -2444,17 +2684,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
case IceInternal.Protocol.validateConnectionMsg:
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- if(_warn)
+ if(_callback != null)
{
- _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
+ info.heartbeatCallback = _callback;
+ ++_dispatchCount;
}
break;
}
default:
{
- IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)",
- info.stream, _logger, _traceLevels);
+ IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream,
+ _logger, _traceLevels);
throw new UnknownMessageException();
}
}
@@ -2737,6 +2978,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private void
+ reap()
+ {
+ if(_monitor != null)
+ {
+ _monitor.reap(this);
+ }
+ if(_observer != null)
+ {
+ _observer.detach();
+ }
+ }
+
public IceInternal.Outgoing
getOutgoing(IceInternal.RequestHandler handler, String operation, OperationMode mode,
java.util.Map<String, String> context, InvocationObserver observer)
@@ -2818,6 +3072,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.isSent = false;
}
+ public void
+ timedOut()
+ {
+ assert((out != null || outAsync != null) && !isSent);
+ out = null;
+ outAsync = null;
+ }
+
public void
adopt()
{
@@ -2832,18 +3094,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
public boolean
- sent(ConnectionI connection, boolean notify)
+ sent()
{
isSent = true; // The message is sent.
if(out != null)
{
- out.sent(notify); // true = notify the waiting thread that the request was sent.
+ out.sent();
return false;
}
else if(outAsync != null)
{
- return outAsync.__sent(connection);
+ return outAsync.__sent();
}
else
{
@@ -2876,7 +3138,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private Communicator _communicator;
private final IceInternal.Instance _instance;
- private final IceInternal.ConnectionReaper _reaper;
+ private IceInternal.ACMMonitor _monitor;
private final IceInternal.Transceiver _transceiver;
private String _desc;
private final String _type;
@@ -2902,8 +3164,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private final boolean _warn;
private final boolean _warnUdp;
- private final long _acmTimeout;
- private long _acmAbsoluteTimeoutMillis;
+
+ private long _acmLastActivity;
private final int _compressionLevel;
@@ -2952,6 +3214,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private Ice.ConnectionInfo _info;
+ private ConnectionCallback _callback;
+
private static Ice.Instrumentation.ConnectionState connectionStateMap[] = {
Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated