diff options
author | Marc Laukien <marc@zeroc.com> | 2003-01-17 19:00:16 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-01-17 19:00:16 +0000 |
commit | 6ef4c296bc6b24017da915154f3937224da2b5af (patch) | |
tree | 8ed76ae778a13a682b7f06e21b5c0f9a227c460a /java/src | |
parent | ConnectionMonitor (diff) | |
download | ice-6ef4c296bc6b24017da915154f3937224da2b5af.tar.bz2 ice-6ef4c296bc6b24017da915154f3937224da2b5af.tar.xz ice-6ef4c296bc6b24017da915154f3937224da2b5af.zip |
ConnectionMonitor; fixed batch bug
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/Connection.java | 145 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectionMonitor.java | 170 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 21 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 18 | ||||
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 3 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 2 |
7 files changed, 344 insertions, 21 deletions
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 558cf806401..f29c51c41d2 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -1101,6 +1101,12 @@ public class BasicStream return _limit; } + boolean + isEmpty() + { + return _limit == 0; + } + private void expand(int size) { diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index 9f0e91cebec..4359e9776f4 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -94,6 +94,45 @@ public final class Connection extends EventHandler } public synchronized void + monitor() + { + if(_state != StateActive) + { + return; + } + + // + // Check for timed out async requests. + // + java.util.Iterator i = _asyncRequests.entryIterator(); + while(i.hasNext()) + { + IntMap.Entry e = (IntMap.Entry)i.next(); + OutgoingAsync out = (OutgoingAsync)e.getValue(); + if(out.__timedOut()) + { + setState(StateClosed, new Ice.TimeoutException()); + return; + } + } + + // + // Active connection management for idle connections. + // + if(_acmTimeout > 0 && + _requests.isEmpty() && _asyncRequests.isEmpty() && + !_batchStreamInUse && _batchStream.isEmpty() && + _dispatchCount == 0) + { + if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis) + { + setState(StateClosing, new Ice.ConnectionTimeoutException()); + return; + } + } + } + + public synchronized void validate() { if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. @@ -162,6 +201,21 @@ public final class Connection extends EventHandler // We only print warnings after successful connection validation. // _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + + // + // We only use active connection management after successful + // connection validation. We don't use active connection + // management for datagram connections at all, because such + // "virtual connections" cannot be reestablished. + // + if(!_endpoint.datagram()) + { + _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime"); + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } } public synchronized void @@ -257,6 +311,11 @@ public final class Connection extends EventHandler { _requests.put(requestId, out); } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } public synchronized void @@ -304,6 +363,11 @@ public final class Connection extends EventHandler // Only add to the request map if there was no exception. // _asyncRequests.put(requestId, out); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } private final static byte[] _requestBatchHdr = @@ -335,21 +399,26 @@ public final class Connection extends EventHandler } assert(_state < StateClosing); - // - // The Connection now belongs to the caller, until - // finishBatchRequest() is called. - // - - if(_batchStream.size() == 0) + if(_batchStream.isEmpty()) { - _batchStream.writeBlob(_requestBatchHdr); + try + { + _batchStream.writeBlob(_requestBatchHdr); + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + throw ex; + } } - // - // Give the batch stream to caller, until finishBatchRequest() - // or abortBatchRequest() is called. - // _batchStreamInUse = true; + _batchStream.swap(os); + + // + // _batchStream now belongs to the caller, until + // finishBatchRequest() or abortBatchRequest() is called. + // } public synchronized void @@ -399,6 +468,11 @@ public final class Connection extends EventHandler } } + if(_batchStream.isEmpty()) + { + return; // Nothing to send. + } + if(_exception != null) { throw _exception; @@ -407,11 +481,6 @@ public final class Connection extends EventHandler try { - if(_batchStream.size() == 0) - { - return; // Nothing to send. - } - _batchStream.pos(3); // @@ -443,6 +512,11 @@ public final class Connection extends EventHandler assert(_exception != null); throw _exception; } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } public synchronized void @@ -482,6 +556,11 @@ public final class Connection extends EventHandler { setState(StateClosed, ex); } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } public synchronized void @@ -568,6 +647,13 @@ public final class Connection extends EventHandler read(BasicStream stream) { _transceiver.read(stream, 0); + + // + // Updating _acmAbsoluteTimeoutMillis is to expensive here, because + // we would have to acquire a lock just for this + // purpose. Instead, we update _acmAbsoluteTimeoutMillis in + // message(). + // } private final static byte[] _replyHdr = @@ -596,6 +682,11 @@ public final class Connection extends EventHandler return; } + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + try { assert(stream.pos() == stream.size()); @@ -841,6 +932,8 @@ public final class Connection extends EventHandler _traceLevels = instance.traceLevels(); _registeredWithPool = false; _warn = false; + _acmTimeout = 0; + _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; _batchStream = new BasicStream(instance); _batchStreamInUse = false; @@ -888,6 +981,7 @@ public final class Connection extends EventHandler // Don't warn about certain expected exceptions. // if(!(_exception instanceof Ice.CloseConnectionException || + _exception instanceof Ice.ConnectionTimeoutException || _exception instanceof Ice.CommunicatorDestroyedException || _exception instanceof Ice.ObjectAdapterDeactivatedException || (_exception instanceof Ice.ConnectionLostException && _state == StateClosing))) @@ -913,8 +1007,8 @@ public final class Connection extends EventHandler while(i.hasNext()) { IntMap.Entry e = (IntMap.Entry)i.next(); - Outgoing out = (Outgoing)e.getValue(); - out.finished(_exception); + OutgoingAsync out = (OutgoingAsync)e.getValue(); + out.__finished(_exception); } _asyncRequests.clear(); } @@ -1057,6 +1151,12 @@ public final class Connection extends EventHandler } _registeredWithPool = true; + + ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); + if(connectionMonitor != null) + { + connectionMonitor.add(this); + } } } @@ -1077,6 +1177,12 @@ public final class Connection extends EventHandler } _registeredWithPool = false; + + ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); + if(connectionMonitor != null) + { + connectionMonitor.remove(this); + } } } @@ -1166,6 +1272,9 @@ public final class Connection extends EventHandler private boolean _warn; + private int _acmTimeout; + private long _acmAbsoluteTimeoutMillis; + private int _nextRequestId; private IntMap _requests = new IntMap(); private IntMap _asyncRequests = new IntMap(); diff --git a/java/src/IceInternal/ConnectionMonitor.java b/java/src/IceInternal/ConnectionMonitor.java new file mode 100644 index 00000000000..153e47f0261 --- /dev/null +++ b/java/src/IceInternal/ConnectionMonitor.java @@ -0,0 +1,170 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// ZeroC, Inc. +// Billerica, MA, USA +// +// All Rights Reserved. +// +// Ice is free software; you can redistribute it and/or modify it under +// the terms of the GNU General Public License version 2 as published by +// the Free Software Foundation. +// +// ********************************************************************** + +package IceInternal; + +public class ConnectionMonitor extends Thread +{ + public void + destroy() + { + synchronized(this) + { + if(_instance == null) + { + return; + } + + _instance = null; + _connections.clear(); + + notify(); + } + + while(true) + { + try + { + join(); + break; + } + catch(java.lang.InterruptedException ex) + { + continue; + } + } + } + + public synchronized void + add(Connection connection) + { + if(_instance == null) + { + throw new Ice.CommunicatorDestroyedException(); + } + + _connections.add(connection); + } + + public synchronized void + remove(Connection connection) + { + if(_instance == null) + { + throw new Ice.CommunicatorDestroyedException(); + } + + _connections.remove(connection); + } + + // + // Only for use by Instance. + // + ConnectionMonitor(Instance instance, int interval) + { + _instance = instance; + _interval = interval; + + assert(_interval > 0); + start(); + } + + protected void + finalize() + throws Throwable + { + assert(_instance == null); + assert(_connections.isEmpty()); + + super.finalize(); + } + + public void + run() + { + java.util.HashSet connections = new java.util.HashSet(); + + while(true) + { + synchronized(this) + { + if(_instance == null) + { + return; + } + + try + { + wait(_interval * 1000); + } + catch(InterruptedException ex) + { + continue; + } + + if(_instance == null) + { + return; + } + + connections.clear(); + connections.addAll(_connections); + } + + // + // Monitor connections outside the thread synchronization, + // so that connections can be added or removed during + // monitoring. + // + java.util.Iterator iter = connections.iterator(); + while(iter.hasNext()) + { + Connection connection = (Connection)iter.next(); + + try + { + connection.monitor(); + } + catch(Ice.LocalException ex) + { + synchronized(this) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString(); + _instance.logger().error(s); + } + } + catch(RuntimeException ex) + { + synchronized(this) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "unknown exception in thread pool thread " + getName() + ":\n" + sw.toString(); + _instance.logger().error(s); + } + } + } + } + } + + private Instance _instance; + private final int _interval; + private java.util.HashSet _connections = new java.util.HashSet(); +} diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 7ed0c30356d..5c5a40ae1bd 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -79,6 +79,12 @@ public class Instance return _outgoingConnectionFactory; } + public synchronized ConnectionMonitor + connectionMonitor() + { + return _connectionMonitor; + } + public synchronized ObjectFactoryManager servantFactoryManager() { @@ -191,6 +197,13 @@ public class Instance _outgoingConnectionFactory = new OutgoingConnectionFactory(this); + int acmTimeout = _properties.getPropertyAsInt("Ice.ConnectionIdleTime"); + int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", acmTimeout); + if(interval > 0) + { + _connectionMonitor = new ConnectionMonitor(this, interval); + } + _servantFactoryManager = new ObjectFactoryManager(); _userExceptionFactoryManager = new UserExceptionFactoryManager(); @@ -214,6 +227,7 @@ public class Instance assert(_referenceFactory == null); assert(_proxyFactory == null); assert(_outgoingConnectionFactory == null); + assert(_connectionMonitor == null); assert(_servantFactoryManager == null); assert(_userExceptionFactoryManager == null); assert(_objectAdapterFactory == null); @@ -281,6 +295,12 @@ public class Instance _objectAdapterFactory = null; _outgoingConnectionFactory = null; + if(_connectionMonitor != null) + { + _connectionMonitor.destroy(); + _connectionMonitor = null; + } + if(_serverThreadPool != null) { _serverThreadPool.destroy(); @@ -347,6 +367,7 @@ public class Instance private ReferenceFactory _referenceFactory; private ProxyFactory _proxyFactory; private OutgoingConnectionFactory _outgoingConnectionFactory; + private ConnectionMonitor _connectionMonitor; private ObjectFactoryManager _servantFactoryManager; private UserExceptionFactoryManager _userExceptionFactoryManager; private ObjectAdapterFactory _objectAdapterFactory; diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 5f10f877b4c..4484af3c6ad 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -94,6 +94,10 @@ public abstract class OutgoingAsync try { _connection.sendAsyncRequest(this); + if(_connection.timeout() >= 0) + { + _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout() * 1000; + } } catch(RuntimeException ex) { @@ -227,6 +231,19 @@ public abstract class OutgoingAsync } } + public boolean + __timedOut() + { + if(_connection.timeout() >= 0) + { + return System.currentTimeMillis() >= _absoluteTimeoutMillis; + } + else + { + return false; + } + } + public BasicStream __is() { @@ -273,6 +290,7 @@ public abstract class OutgoingAsync } private Connection _connection; + private long _absoluteTimeoutMillis; private BasicStream _is; private BasicStream _os; } diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 1fa5e1903b5..282ee1278e0 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -98,8 +98,7 @@ final class TcpTransceiver implements Transceiver if(_traceLevels.network >= 3) { - String s = "sent " + ret + " of " + buf.limit() + - " bytes via tcp\n" + toString(); + String s = "sent " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString(); _logger.trace(_traceLevels.networkCat, s); } } diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 7c4a1b999b4..fb020d21a29 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -903,7 +903,7 @@ public final class ThreadPool java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); - String s = "exception in thread pool:\n" + sw.toString(); + String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString(); _instance.logger().error(s); } catch(RuntimeException ex) |