summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2003-03-07 20:55:48 +0000
committerMarc Laukien <marc@zeroc.com>2003-03-07 20:55:48 +0000
commitf4f188980a8d589aec4b068d61b7697305ff7f1e (patch)
treec6549f2351cf84de0d05104a76d1299ff31ca6b8 /java/src
parentdyn thread pool (diff)
downloadice-f4f188980a8d589aec4b068d61b7697305ff7f1e.tar.bz2
ice-f4f188980a8d589aec4b068d61b7697305ff7f1e.tar.xz
ice-f4f188980a8d589aec4b068d61b7697305ff7f1e.zip
internal changes
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/ThreadPool.java543
1 files changed, 285 insertions, 258 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 17153ef984c..64e36982523 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -29,9 +29,10 @@ public final class ThreadPool
{
_instance = instance;
_destroyed = false;
+ _prefix = prefix;
+ _inUse = 0;
_timeout = timeout;
_promote = true;
- _prefix = prefix;
Network.SocketPair pair = Network.createPipe();
_fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
@@ -56,15 +57,15 @@ public final class ThreadPool
//
_keys = _selector.selectedKeys();
- int size = _instance.properties().getPropertyAsInt(_prefix + ".Size");
+ int size = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".Size", 5);
if(size < 1)
{
size = 1;
- _instance.properties().setProperty(_prefix + ".Size", "1");
+ _instance.properties().setProperty(_prefix + ".Size", "" + size);
}
_size = size;
-
- int sizeMax = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 5);
+
+ int sizeMax = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 10);
if(sizeMax < _size)
{
sizeMax = _size;
@@ -72,6 +73,9 @@ public final class ThreadPool
}
_sizeMax = sizeMax;
+ int sizeWarn = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeWarn", _sizeMax * 80 / 100);
+ _sizeWarn = sizeWarn;
+
//
// Use Ice.ProgramName as the prefix for the thread names.
//
@@ -98,7 +102,7 @@ public final class ThreadPool
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
- String s = "cannot create threads for thread pool:\n" + sw.toString();
+ String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
_instance.logger().error(s);
destroy();
@@ -112,6 +116,8 @@ public final class ThreadPool
throws Throwable
{
assert(_destroyed);
+ assert(_inUse == 0);
+
if(_selector != null)
{
try
@@ -207,7 +213,7 @@ public final class ThreadPool
public void
promoteFollower()
{
- if(_size > 1)
+ if(_sizeMax > 1)
{
if(!_promote) // Double-checked locking.
{
@@ -352,299 +358,315 @@ public final class ThreadPool
//
// Each thread supplies a BasicStream, to avoid creating excessive
- // garbage (Java only)
+ // garbage (Java only).
//
private void
run(BasicStream stream)
{
+ if(_sizeMax > 1)
+ {
+ synchronized(_promoteMonitor)
+ {
+ while(!_promote)
+ {
+ try
+ {
+ _promoteMonitor.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _promote = false;
+ }
+
+ if(TRACE_THREAD)
+ {
+ trace("thread " + Thread.currentThread() + " has the lock");
+ }
+ }
+
while(true)
{
- if(_size > 1)
- {
- synchronized(_promoteMonitor)
+ if(TRACE_REGISTRATION)
+ {
+ java.util.Set keys = _selector.keys();
+ trace("selecting on " + keys.size() + " channels:");
+ java.util.Iterator i = keys.iterator();
+ while(i.hasNext())
{
- while(!_promote)
+ java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
+ trace(" " + key.channel());
+ }
+ }
+
+ select();
+ if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
+ {
+ if(TRACE_SELECT)
+ {
+ trace("timeout");
+ }
+
+ assert(_timeout > 0);
+ _timeout = 0;
+ initiateShutdown();
+ continue;
+ }
+
+ EventHandler handler = null;
+ boolean finished = false;
+ boolean shutdown = false;
+
+ synchronized(this)
+ {
+ if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
+ {
+ if(TRACE_SELECT || TRACE_INTERRUPT)
{
- try
- {
- _promoteMonitor.wait();
- }
- catch(InterruptedException ex)
+ trace("detected interrupt");
+ }
+
+ //
+ // There are three possibilities for an interrupt:
+ //
+ // - The thread pool has been destroyed.
+ //
+ // - Server shutdown has been initiated.
+ //
+ // - An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if(_destroyed)
+ {
+ if(TRACE_SHUTDOWN)
{
+ trace("destroyed, thread id = " + Thread.currentThread());
}
- }
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
- }
- }
-
- repeatSelect:
-
- while(true)
- {
- if(TRACE_REGISTRATION)
- {
- java.util.Set keys = _selector.keys();
- trace("selecting on " + keys.size() + " channels:");
- java.util.Iterator i = keys.iterator();
- while(i.hasNext())
- {
- java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
- trace(" " + key.channel());
- }
- }
-
- select();
- if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
- {
- if(TRACE_SELECT)
- {
- trace("timeout");
- }
-
- assert(_timeout > 0);
- _timeout = 0;
- initiateShutdown();
- continue repeatSelect;
- }
-
- EventHandler handler = null;
- boolean finished = false;
- boolean shutdown = false;
-
- synchronized(this)
- {
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
- {
- if(TRACE_SELECT || TRACE_INTERRUPT)
- {
- trace("detected interrupt");
- }
-
- //
- // There are three possibilities for an interrupt:
- //
- // - The thread pool has been destroyed.
- //
- // - Server shutdown has been initiated.
- //
- // - An event handler was registered or unregistered.
- //
-
- //
- // Thread pool destroyed?
- //
- if(_destroyed)
- {
- if(TRACE_SHUTDOWN)
- {
- trace("destroyed, thread id = " + Thread.currentThread());
- }
-
- //
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
- //
- return;
- }
+ //
+ // Don't clear the interrupt fd if destroyed,
+ // so that the other threads exit as well.
+ //
+ return;
+ }
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
+ //
+ // Remove the interrupt channel from the selected key set.
+ //
+ _keys.remove(_fdIntrReadKey);
- shutdown = clearInterrupt();
+ shutdown = clearInterrupt();
- if(!shutdown)
- {
- //
- // An event handler must have been
- // registered or unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+ if(!shutdown)
+ {
+ //
+ // An event handler must have been registered
+ // or unregistered.
+ //
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
- if(change.handler != null) // Addition if handler is set.
+ if(change.handler != null) // Addition if handler is set.
+ {
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
{
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
- continue repeatSelect;
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
}
- else // Removal if handler is not set.
+ catch(java.nio.channels.ClosedChannelException ex)
{
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
+ assert(false);
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
- }
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
- // Don't goto repeatSelect; we have to
- // call finished() on the event
- // handler below, outside the thread
- // synchronization.
+ continue;
+ }
+ else // Removal if handler is not set.
+ {
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ pair.key.cancel();
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
}
+
+ // Don't continue; we have to call
+ // finished() on the event handler below,
+ // outside the thread synchronization.
}
}
- else
- {
- java.nio.channels.SelectionKey key = null;
- java.util.Iterator iter = _keys.iterator();
- while(iter.hasNext())
- {
- //
- // Ignore selection keys that have been cancelled
- //
- java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
- iter.remove();
- if(k.isValid() && key != _fdIntrReadKey)
- {
- if(TRACE_SELECT)
- {
- trace("found a key: " + keyToString(k));
- }
-
- key = k;
- break;
- }
- }
-
- if(key == null)
- {
- if(TRACE_SELECT)
- {
- trace("didn't find a valid key");
- }
-
- continue repeatSelect;
- }
-
- handler = (EventHandler)key.attachment();
- }
- }
-
- assert(handler != null || shutdown);
-
- if(shutdown) // Shutdown has been initiated.
- {
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
- ObjectAdapterFactory factory = _instance.objectAdapterFactory();
- if(factory == null)
- {
- continue repeatSelect;
- }
-
- promoteFollower();
- factory.shutdown();
- }
+ }
else
{
- if(finished)
+ java.nio.channels.SelectionKey key = null;
+ java.util.Iterator iter = _keys.iterator();
+ while(iter.hasNext())
{
//
- // Notify a handler about it's removal from
- // the thread pool.
+ // Ignore selection keys that have been cancelled
//
- try
+ java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
+ iter.remove();
+ if(k.isValid() && key != _fdIntrReadKey)
{
- handler.finished(this);
+ if(TRACE_SELECT)
+ {
+ trace("found a key: " + keyToString(k));
+ }
+
+ key = k;
+ break;
}
- catch(Ice.LocalException ex)
+ }
+
+ if(key == null)
+ {
+ if(TRACE_SELECT)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception while calling finished():\n" + sw.toString() + "\n" +
- handler.toString();
- _instance.logger().error(s);
+ trace("didn't find a valid key");
}
+
+ continue;
}
- else
+
+ handler = (EventHandler)key.attachment();
+ }
+ }
+
+ assert(handler != null || shutdown);
+
+ if(shutdown) // Shutdown has been initiated.
+ {
+ if(TRACE_SHUTDOWN)
+ {
+ trace("shutdown detected");
+ }
+
+ ObjectAdapterFactory factory = _instance.objectAdapterFactory();
+ if(factory == null)
+ {
+ continue;
+ }
+
+ promoteFollower();
+ factory.shutdown();
+ }
+ else
+ {
+ if(finished)
+ {
+ //
+ // Notify a handler about it's removal from
+ // the thread pool.
+ //
+ try
{
- //
- // If the handler is "readable", try to read a
- // message.
- //
- try
+ handler.finished(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling finished():\n" +
+ sw.toString() + "\n" + handler.toString();
+ _instance.logger().error(s);
+ }
+ }
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ try
+ {
+ if(handler.readable())
{
- if(handler.readable())
+ try
{
- try
- {
- read(handler);
- }
- catch(Ice.TimeoutException ex) // Expected.
+ read(handler);
+ }
+ catch(Ice.TimeoutException ex) // Expected.
+ {
+ continue;
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(TRACE_EXCEPTION)
{
- continue repeatSelect;
+ trace("informing handler (" + handler.getClass().getName() +
+ ") about exception " + ex);
+ ex.printStackTrace();
}
- catch(Ice.LocalException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
- handler.exception(ex);
- continue repeatSelect;
- }
-
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
+ handler.exception(ex);
+ continue;
}
+
+ stream.swap(handler._stream);
+ assert(stream.pos() == stream.size());
+ }
- handler.message(stream, this);
+ handler.message(stream, this);
+ }
+ finally
+ {
+ stream.reset();
+ }
+ }
+ }
+
+ if(_sizeMax > 1)
+ {
+ synchronized(_promoteMonitor)
+ {
+ while(!_promote)
+ {
+ try
+ {
+ _promoteMonitor.wait();
}
- finally
+ catch(InterruptedException ex)
{
- stream.reset();
}
}
- }
+
+ _promote = false;
+ }
- break; // Inner while loop.
- }
+ if(TRACE_THREAD)
+ {
+ trace("thread " + Thread.currentThread() + " has the lock");
+ }
+ }
}
}
@@ -893,8 +915,14 @@ public final class ThreadPool
private Instance _instance;
private boolean _destroyed;
- private final int _size;
- private final int _sizeMax;
+ private String _prefix;
+
+ private final int _size; // Number of threads that are pre-created.
+ private final int _sizeMax; // Maximum number of threads.
+ private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+ private int _inUse; // Number of threads that are currently in use.
+ private java.lang.Object _inUseMutex = new java.lang.Object();
+
private java.nio.channels.ReadableByteChannel _fdIntrRead;
private java.nio.channels.SelectionKey _fdIntrReadKey;
private java.nio.channels.WritableByteChannel _fdIntrWrite;
@@ -905,7 +933,6 @@ public final class ThreadPool
private int _timeout;
private boolean _promote;
private java.lang.Object _promoteMonitor = new java.lang.Object();
- private String _prefix;
private final class EventHandlerThread extends Thread
{
@@ -929,7 +956,7 @@ public final class ThreadPool
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
- String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString();
+ String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.logger().error(s);
}
catch(RuntimeException ex)
@@ -938,7 +965,7 @@ public final class ThreadPool
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
- String s = "unknown exception in thread pool thread " + getName() + ":\n" + sw.toString();
+ String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.logger().error(s);
}