diff options
author | Marc Laukien <marc@zeroc.com> | 2003-03-24 16:27:02 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-03-24 16:27:02 +0000 |
commit | 8effd206a20226446062a0ce3f5bf1c67350ba13 (patch) | |
tree | 796d5569f19c29186445d8edbb58c905237bf72b /java/src/IceInternal/ThreadPool.java | |
parent | make depend (diff) | |
download | ice-8effd206a20226446062a0ce3f5bf1c67350ba13.tar.bz2 ice-8effd206a20226446062a0ce3f5bf1c67350ba13.tar.xz ice-8effd206a20226446062a0ce3f5bf1c67350ba13.zip |
dyn thread pool
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 152 |
1 files changed, 109 insertions, 43 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index ac42be4cec8..d130bc7e241 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -31,6 +31,8 @@ public final class ThreadPool _destroyed = false; _prefix = prefix; _timeout = timeout; + _threadIndex = 0; + _running = 0; _inUse = 0; _load = 0; _promote = true; @@ -87,13 +89,14 @@ public final class ThreadPool try { - _threads = new java.util.Vector(_size); + _threads = new java.util.ArrayList(); for(int i = 0; i < _size; i++) { EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threads.size()); + _threadIndex++); _threads.add(thread); thread.start(); + ++_running; } } catch(RuntimeException ex) @@ -116,7 +119,6 @@ public final class ThreadPool throws Throwable { assert(_destroyed); - assert(_inUse == 0); if(_selector != null) { @@ -220,35 +222,39 @@ public final class ThreadPool assert(!_promote); _promote = true; notify(); - - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) - { - String s = "thread pool `" + _prefix + "' is running low on threads\n" - + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; - _instance.logger().warning(s); - } - - assert(_inUse <= _threads.size()); - if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.size()) + + if(!_destroyed) { - try + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) { - EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threads.size()); - _threads.add(thread); - thread.start(); + String s = "thread pool `" + _prefix + "' is running low on threads\n" + + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; + _instance.logger().warning(s); } - catch(RuntimeException ex) + + assert(_inUse <= _running); + if(_inUse < _sizeMax && _inUse == _running) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); - _instance.logger().error(s); + try + { + EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + + _threadIndex++); + _threads.add(thread); + thread.start(); + ++_running; + } + catch(RuntimeException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); + _instance.logger().error(s); + } } } } @@ -276,10 +282,10 @@ public final class ThreadPool // other threads would never terminate.) // assert(_destroyed); - java.util.Enumeration e = _threads.elements(); - while(e.hasMoreElements()) + java.util.Iterator i = _threads.iterator(); + while(i.hasNext()) { - EventHandlerThread thread = (EventHandlerThread)e.nextElement(); + EventHandlerThread thread = (EventHandlerThread)i.next(); while(true) { @@ -386,7 +392,7 @@ public final class ThreadPool // Each thread supplies a BasicStream, to avoid creating excessive // garbage (Java only). // - private void + private boolean run(BasicStream stream) { if(_sizeMax > 1) @@ -478,7 +484,7 @@ public final class ThreadPool // Don't clear the interrupt fd if destroyed, // so that the other threads exit as well. // - return; + return true; } // @@ -674,8 +680,61 @@ public final class ThreadPool { synchronized(this) { - assert(_inUse > 0); - --_inUse; + if(!_destroyed) + { + // + // First we reap threads that have been + // destroyed before. + // + int sz = _threads.size(); + assert(_running <= sz); + if(_running < sz) + { + java.util.Iterator i = _threads.iterator(); + while(i.hasNext()) + { + EventHandlerThread thread = (EventHandlerThread)i.next(); + + if(!thread.isAlive()) + { + try + { + thread.join(); + i.remove(); + } + catch(InterruptedException ex) + { + } + } + } + } + + // + // Now we check if this thread can be destroyed, based + // on a load factor. + // + final double loadFactor = 0.05; // TODO: Configurable? + final double oneMinusLoadFactor = 1 - loadFactor; + _load = _load * oneMinusLoadFactor + _inUse * loadFactor; + + if(_running > _size) + { + int load = (int)(_load + 1); + if(load < _running) + { + assert(_inUse > 0); + --_inUse; + + assert(_running > 0); + --_running; + + return false; + } + } + + assert(_inUse > 0); + --_inUse; + } while(!_promote) { @@ -972,9 +1031,11 @@ public final class ThreadPool { BasicStream stream = new BasicStream(_instance); + boolean promote; + try { - ThreadPool.this.run(stream); + promote = ThreadPool.this.run(stream); } catch(Ice.LocalException ex) { @@ -984,6 +1045,7 @@ public final class ThreadPool pw.flush(); String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.logger().error(s); + promote = true; } catch(RuntimeException ex) { @@ -993,19 +1055,20 @@ public final class ThreadPool pw.flush(); String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.logger().error(s); + promote = true; } if(TRACE_THREAD) { - trace("run() terminated - promoting follower"); + trace("run() terminated"); } - // - // Promote a follower, but w/o modifying _inUse or - // creating new threads. - // - if(_sizeMax > 1) + if(promote && _sizeMax > 1) { + // + // Promote a follower, but w/o modifying _inUse or + // creating new threads. + // synchronized(ThreadPool.this) { assert(!_promote); @@ -1018,10 +1081,13 @@ public final class ThreadPool } } - private java.util.Vector _threads; // All threads, running or not. private final int _size; // Number of threads that are pre-created. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. + + private java.util.ArrayList _threads; // All threads, running or not. + private int _threadIndex; // For assigning thread names. + private int _running; // Number of running threads. private int _inUse; // Number of threads that are currently in use. private double _load; // Current load in number of threads. |