summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java152
1 files changed, 109 insertions, 43 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index ac42be4cec8..d130bc7e241 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -31,6 +31,8 @@ public final class ThreadPool
_destroyed = false;
_prefix = prefix;
_timeout = timeout;
+ _threadIndex = 0;
+ _running = 0;
_inUse = 0;
_load = 0;
_promote = true;
@@ -87,13 +89,14 @@ public final class ThreadPool
try
{
- _threads = new java.util.Vector(_size);
+ _threads = new java.util.ArrayList();
for(int i = 0; i < _size; i++)
{
EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threads.size());
+ _threadIndex++);
_threads.add(thread);
thread.start();
+ ++_running;
}
}
catch(RuntimeException ex)
@@ -116,7 +119,6 @@ public final class ThreadPool
throws Throwable
{
assert(_destroyed);
- assert(_inUse == 0);
if(_selector != null)
{
@@ -220,35 +222,39 @@ public final class ThreadPool
assert(!_promote);
_promote = true;
notify();
-
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.logger().warning(s);
- }
-
- assert(_inUse <= _threads.size());
- if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.size())
+
+ if(!_destroyed)
{
- try
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
{
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threads.size());
- _threads.add(thread);
- thread.start();
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.logger().warning(s);
}
- catch(RuntimeException ex)
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
- _instance.logger().error(s);
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
+ _threadIndex++);
+ _threads.add(thread);
+ thread.start();
+ ++_running;
+ }
+ catch(RuntimeException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
+ _instance.logger().error(s);
+ }
}
}
}
@@ -276,10 +282,10 @@ public final class ThreadPool
// other threads would never terminate.)
//
assert(_destroyed);
- java.util.Enumeration e = _threads.elements();
- while(e.hasMoreElements())
+ java.util.Iterator i = _threads.iterator();
+ while(i.hasNext())
{
- EventHandlerThread thread = (EventHandlerThread)e.nextElement();
+ EventHandlerThread thread = (EventHandlerThread)i.next();
while(true)
{
@@ -386,7 +392,7 @@ public final class ThreadPool
// Each thread supplies a BasicStream, to avoid creating excessive
// garbage (Java only).
//
- private void
+ private boolean
run(BasicStream stream)
{
if(_sizeMax > 1)
@@ -478,7 +484,7 @@ public final class ThreadPool
// Don't clear the interrupt fd if destroyed,
// so that the other threads exit as well.
//
- return;
+ return true;
}
//
@@ -674,8 +680,61 @@ public final class ThreadPool
{
synchronized(this)
{
- assert(_inUse > 0);
- --_inUse;
+ if(!_destroyed)
+ {
+ //
+ // First we reap threads that have been
+ // destroyed before.
+ //
+ int sz = _threads.size();
+ assert(_running <= sz);
+ if(_running < sz)
+ {
+ java.util.Iterator i = _threads.iterator();
+ while(i.hasNext())
+ {
+ EventHandlerThread thread = (EventHandlerThread)i.next();
+
+ if(!thread.isAlive())
+ {
+ try
+ {
+ thread.join();
+ i.remove();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ }
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+ final double loadFactor = 0.05; // TODO: Configurable?
+ final double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+
+ if(_running > _size)
+ {
+ int load = (int)(_load + 1);
+ if(load < _running)
+ {
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
+
+ return false;
+ }
+ }
+
+ assert(_inUse > 0);
+ --_inUse;
+ }
while(!_promote)
{
@@ -972,9 +1031,11 @@ public final class ThreadPool
{
BasicStream stream = new BasicStream(_instance);
+ boolean promote;
+
try
{
- ThreadPool.this.run(stream);
+ promote = ThreadPool.this.run(stream);
}
catch(Ice.LocalException ex)
{
@@ -984,6 +1045,7 @@ public final class ThreadPool
pw.flush();
String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.logger().error(s);
+ promote = true;
}
catch(RuntimeException ex)
{
@@ -993,19 +1055,20 @@ public final class ThreadPool
pw.flush();
String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.logger().error(s);
+ promote = true;
}
if(TRACE_THREAD)
{
- trace("run() terminated - promoting follower");
+ trace("run() terminated");
}
- //
- // Promote a follower, but w/o modifying _inUse or
- // creating new threads.
- //
- if(_sizeMax > 1)
+ if(promote && _sizeMax > 1)
{
+ //
+ // Promote a follower, but w/o modifying _inUse or
+ // creating new threads.
+ //
synchronized(ThreadPool.this)
{
assert(!_promote);
@@ -1018,10 +1081,13 @@ public final class ThreadPool
}
}
- private java.util.Vector _threads; // All threads, running or not.
private final int _size; // Number of threads that are pre-created.
private final int _sizeMax; // Maximum number of threads.
private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+
+ private java.util.ArrayList _threads; // All threads, running or not.
+ private int _threadIndex; // For assigning thread names.
+ private int _running; // Number of running threads.
private int _inUse; // Number of threads that are currently in use.
private double _load; // Current load in number of threads.