summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2003-01-17 19:00:16 +0000
committerMarc Laukien <marc@zeroc.com>2003-01-17 19:00:16 +0000
commit6ef4c296bc6b24017da915154f3937224da2b5af (patch)
tree8ed76ae778a13a682b7f06e21b5c0f9a227c460a /java/src
parentConnectionMonitor (diff)
downloadice-6ef4c296bc6b24017da915154f3937224da2b5af.tar.bz2
ice-6ef4c296bc6b24017da915154f3937224da2b5af.tar.xz
ice-6ef4c296bc6b24017da915154f3937224da2b5af.zip
ConnectionMonitor; fixed batch bug
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/BasicStream.java6
-rw-r--r--java/src/IceInternal/Connection.java145
-rw-r--r--java/src/IceInternal/ConnectionMonitor.java170
-rw-r--r--java/src/IceInternal/Instance.java21
-rw-r--r--java/src/IceInternal/OutgoingAsync.java18
-rw-r--r--java/src/IceInternal/TcpTransceiver.java3
-rw-r--r--java/src/IceInternal/ThreadPool.java2
7 files changed, 344 insertions, 21 deletions
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index 558cf806401..f29c51c41d2 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -1101,6 +1101,12 @@ public class BasicStream
return _limit;
}
+ boolean
+ isEmpty()
+ {
+ return _limit == 0;
+ }
+
private void
expand(int size)
{
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index 9f0e91cebec..4359e9776f4 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -94,6 +94,45 @@ public final class Connection extends EventHandler
}
public synchronized void
+ monitor()
+ {
+ if(_state != StateActive)
+ {
+ return;
+ }
+
+ //
+ // Check for timed out async requests.
+ //
+ java.util.Iterator i = _asyncRequests.entryIterator();
+ while(i.hasNext())
+ {
+ IntMap.Entry e = (IntMap.Entry)i.next();
+ OutgoingAsync out = (OutgoingAsync)e.getValue();
+ if(out.__timedOut())
+ {
+ setState(StateClosed, new Ice.TimeoutException());
+ return;
+ }
+ }
+
+ //
+ // Active connection management for idle connections.
+ //
+ if(_acmTimeout > 0 &&
+ _requests.isEmpty() && _asyncRequests.isEmpty() &&
+ !_batchStreamInUse && _batchStream.isEmpty() &&
+ _dispatchCount == 0)
+ {
+ if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis)
+ {
+ setState(StateClosing, new Ice.ConnectionTimeoutException());
+ return;
+ }
+ }
+ }
+
+ public synchronized void
validate()
{
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
@@ -162,6 +201,21 @@ public final class Connection extends EventHandler
// We only print warnings after successful connection validation.
//
_warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
+
+ //
+ // We only use active connection management after successful
+ // connection validation. We don't use active connection
+ // management for datagram connections at all, because such
+ // "virtual connections" cannot be reestablished.
+ //
+ if(!_endpoint.datagram())
+ {
+ _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime");
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+ }
}
public synchronized void
@@ -257,6 +311,11 @@ public final class Connection extends EventHandler
{
_requests.put(requestId, out);
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
public synchronized void
@@ -304,6 +363,11 @@ public final class Connection extends EventHandler
// Only add to the request map if there was no exception.
//
_asyncRequests.put(requestId, out);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
private final static byte[] _requestBatchHdr =
@@ -335,21 +399,26 @@ public final class Connection extends EventHandler
}
assert(_state < StateClosing);
- //
- // The Connection now belongs to the caller, until
- // finishBatchRequest() is called.
- //
-
- if(_batchStream.size() == 0)
+ if(_batchStream.isEmpty())
{
- _batchStream.writeBlob(_requestBatchHdr);
+ try
+ {
+ _batchStream.writeBlob(_requestBatchHdr);
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ throw ex;
+ }
}
- //
- // Give the batch stream to caller, until finishBatchRequest()
- // or abortBatchRequest() is called.
- //
_batchStreamInUse = true;
+ _batchStream.swap(os);
+
+ //
+ // _batchStream now belongs to the caller, until
+ // finishBatchRequest() or abortBatchRequest() is called.
+ //
}
public synchronized void
@@ -399,6 +468,11 @@ public final class Connection extends EventHandler
}
}
+ if(_batchStream.isEmpty())
+ {
+ return; // Nothing to send.
+ }
+
if(_exception != null)
{
throw _exception;
@@ -407,11 +481,6 @@ public final class Connection extends EventHandler
try
{
- if(_batchStream.size() == 0)
- {
- return; // Nothing to send.
- }
-
_batchStream.pos(3);
//
@@ -443,6 +512,11 @@ public final class Connection extends EventHandler
assert(_exception != null);
throw _exception;
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
public synchronized void
@@ -482,6 +556,11 @@ public final class Connection extends EventHandler
{
setState(StateClosed, ex);
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
public synchronized void
@@ -568,6 +647,13 @@ public final class Connection extends EventHandler
read(BasicStream stream)
{
_transceiver.read(stream, 0);
+
+ //
+ // Updating _acmAbsoluteTimeoutMillis is to expensive here, because
+ // we would have to acquire a lock just for this
+ // purpose. Instead, we update _acmAbsoluteTimeoutMillis in
+ // message().
+ //
}
private final static byte[] _replyHdr =
@@ -596,6 +682,11 @@ public final class Connection extends EventHandler
return;
}
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+
try
{
assert(stream.pos() == stream.size());
@@ -841,6 +932,8 @@ public final class Connection extends EventHandler
_traceLevels = instance.traceLevels();
_registeredWithPool = false;
_warn = false;
+ _acmTimeout = 0;
+ _acmAbsoluteTimeoutMillis = 0;
_nextRequestId = 1;
_batchStream = new BasicStream(instance);
_batchStreamInUse = false;
@@ -888,6 +981,7 @@ public final class Connection extends EventHandler
// Don't warn about certain expected exceptions.
//
if(!(_exception instanceof Ice.CloseConnectionException ||
+ _exception instanceof Ice.ConnectionTimeoutException ||
_exception instanceof Ice.CommunicatorDestroyedException ||
_exception instanceof Ice.ObjectAdapterDeactivatedException ||
(_exception instanceof Ice.ConnectionLostException && _state == StateClosing)))
@@ -913,8 +1007,8 @@ public final class Connection extends EventHandler
while(i.hasNext())
{
IntMap.Entry e = (IntMap.Entry)i.next();
- Outgoing out = (Outgoing)e.getValue();
- out.finished(_exception);
+ OutgoingAsync out = (OutgoingAsync)e.getValue();
+ out.__finished(_exception);
}
_asyncRequests.clear();
}
@@ -1057,6 +1151,12 @@ public final class Connection extends EventHandler
}
_registeredWithPool = true;
+
+ ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
+ if(connectionMonitor != null)
+ {
+ connectionMonitor.add(this);
+ }
}
}
@@ -1077,6 +1177,12 @@ public final class Connection extends EventHandler
}
_registeredWithPool = false;
+
+ ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
+ if(connectionMonitor != null)
+ {
+ connectionMonitor.remove(this);
+ }
}
}
@@ -1166,6 +1272,9 @@ public final class Connection extends EventHandler
private boolean _warn;
+ private int _acmTimeout;
+ private long _acmAbsoluteTimeoutMillis;
+
private int _nextRequestId;
private IntMap _requests = new IntMap();
private IntMap _asyncRequests = new IntMap();
diff --git a/java/src/IceInternal/ConnectionMonitor.java b/java/src/IceInternal/ConnectionMonitor.java
new file mode 100644
index 00000000000..153e47f0261
--- /dev/null
+++ b/java/src/IceInternal/ConnectionMonitor.java
@@ -0,0 +1,170 @@
+// **********************************************************************
+//
+// Copyright (c) 2002
+// ZeroC, Inc.
+// Billerica, MA, USA
+//
+// All Rights Reserved.
+//
+// Ice is free software; you can redistribute it and/or modify it under
+// the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class ConnectionMonitor extends Thread
+{
+ public void
+ destroy()
+ {
+ synchronized(this)
+ {
+ if(_instance == null)
+ {
+ return;
+ }
+
+ _instance = null;
+ _connections.clear();
+
+ notify();
+ }
+
+ while(true)
+ {
+ try
+ {
+ join();
+ break;
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ continue;
+ }
+ }
+ }
+
+ public synchronized void
+ add(Connection connection)
+ {
+ if(_instance == null)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
+ _connections.add(connection);
+ }
+
+ public synchronized void
+ remove(Connection connection)
+ {
+ if(_instance == null)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
+ _connections.remove(connection);
+ }
+
+ //
+ // Only for use by Instance.
+ //
+ ConnectionMonitor(Instance instance, int interval)
+ {
+ _instance = instance;
+ _interval = interval;
+
+ assert(_interval > 0);
+ start();
+ }
+
+ protected void
+ finalize()
+ throws Throwable
+ {
+ assert(_instance == null);
+ assert(_connections.isEmpty());
+
+ super.finalize();
+ }
+
+ public void
+ run()
+ {
+ java.util.HashSet connections = new java.util.HashSet();
+
+ while(true)
+ {
+ synchronized(this)
+ {
+ if(_instance == null)
+ {
+ return;
+ }
+
+ try
+ {
+ wait(_interval * 1000);
+ }
+ catch(InterruptedException ex)
+ {
+ continue;
+ }
+
+ if(_instance == null)
+ {
+ return;
+ }
+
+ connections.clear();
+ connections.addAll(_connections);
+ }
+
+ //
+ // Monitor connections outside the thread synchronization,
+ // so that connections can be added or removed during
+ // monitoring.
+ //
+ java.util.Iterator iter = connections.iterator();
+ while(iter.hasNext())
+ {
+ Connection connection = (Connection)iter.next();
+
+ try
+ {
+ connection.monitor();
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString();
+ _instance.logger().error(s);
+ }
+ }
+ catch(RuntimeException ex)
+ {
+ synchronized(this)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "unknown exception in thread pool thread " + getName() + ":\n" + sw.toString();
+ _instance.logger().error(s);
+ }
+ }
+ }
+ }
+ }
+
+ private Instance _instance;
+ private final int _interval;
+ private java.util.HashSet _connections = new java.util.HashSet();
+}
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 7ed0c30356d..5c5a40ae1bd 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -79,6 +79,12 @@ public class Instance
return _outgoingConnectionFactory;
}
+ public synchronized ConnectionMonitor
+ connectionMonitor()
+ {
+ return _connectionMonitor;
+ }
+
public synchronized ObjectFactoryManager
servantFactoryManager()
{
@@ -191,6 +197,13 @@ public class Instance
_outgoingConnectionFactory = new OutgoingConnectionFactory(this);
+ int acmTimeout = _properties.getPropertyAsInt("Ice.ConnectionIdleTime");
+ int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", acmTimeout);
+ if(interval > 0)
+ {
+ _connectionMonitor = new ConnectionMonitor(this, interval);
+ }
+
_servantFactoryManager = new ObjectFactoryManager();
_userExceptionFactoryManager = new UserExceptionFactoryManager();
@@ -214,6 +227,7 @@ public class Instance
assert(_referenceFactory == null);
assert(_proxyFactory == null);
assert(_outgoingConnectionFactory == null);
+ assert(_connectionMonitor == null);
assert(_servantFactoryManager == null);
assert(_userExceptionFactoryManager == null);
assert(_objectAdapterFactory == null);
@@ -281,6 +295,12 @@ public class Instance
_objectAdapterFactory = null;
_outgoingConnectionFactory = null;
+ if(_connectionMonitor != null)
+ {
+ _connectionMonitor.destroy();
+ _connectionMonitor = null;
+ }
+
if(_serverThreadPool != null)
{
_serverThreadPool.destroy();
@@ -347,6 +367,7 @@ public class Instance
private ReferenceFactory _referenceFactory;
private ProxyFactory _proxyFactory;
private OutgoingConnectionFactory _outgoingConnectionFactory;
+ private ConnectionMonitor _connectionMonitor;
private ObjectFactoryManager _servantFactoryManager;
private UserExceptionFactoryManager _userExceptionFactoryManager;
private ObjectAdapterFactory _objectAdapterFactory;
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 5f10f877b4c..4484af3c6ad 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -94,6 +94,10 @@ public abstract class OutgoingAsync
try
{
_connection.sendAsyncRequest(this);
+ if(_connection.timeout() >= 0)
+ {
+ _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout() * 1000;
+ }
}
catch(RuntimeException ex)
{
@@ -227,6 +231,19 @@ public abstract class OutgoingAsync
}
}
+ public boolean
+ __timedOut()
+ {
+ if(_connection.timeout() >= 0)
+ {
+ return System.currentTimeMillis() >= _absoluteTimeoutMillis;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public BasicStream
__is()
{
@@ -273,6 +290,7 @@ public abstract class OutgoingAsync
}
private Connection _connection;
+ private long _absoluteTimeoutMillis;
private BasicStream _is;
private BasicStream _os;
}
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index 1fa5e1903b5..282ee1278e0 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -98,8 +98,7 @@ final class TcpTransceiver implements Transceiver
if(_traceLevels.network >= 3)
{
- String s = "sent " + ret + " of " + buf.limit() +
- " bytes via tcp\n" + toString();
+ String s = "sent " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
}
}
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 7c4a1b999b4..fb020d21a29 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -903,7 +903,7 @@ public final class ThreadPool
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
- String s = "exception in thread pool:\n" + sw.toString();
+ String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString();
_instance.logger().error(s);
}
catch(RuntimeException ex)