diff options
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 543 |
1 files changed, 285 insertions, 258 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 17153ef984c..64e36982523 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -29,9 +29,10 @@ public final class ThreadPool { _instance = instance; _destroyed = false; + _prefix = prefix; + _inUse = 0; _timeout = timeout; _promote = true; - _prefix = prefix; Network.SocketPair pair = Network.createPipe(); _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; @@ -56,15 +57,15 @@ public final class ThreadPool // _keys = _selector.selectedKeys(); - int size = _instance.properties().getPropertyAsInt(_prefix + ".Size"); + int size = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".Size", 5); if(size < 1) { size = 1; - _instance.properties().setProperty(_prefix + ".Size", "1"); + _instance.properties().setProperty(_prefix + ".Size", "" + size); } _size = size; - - int sizeMax = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 5); + + int sizeMax = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 10); if(sizeMax < _size) { sizeMax = _size; @@ -72,6 +73,9 @@ public final class ThreadPool } _sizeMax = sizeMax; + int sizeWarn = _instance.properties().getPropertyAsIntWithDefault(_prefix + ".SizeWarn", _sizeMax * 80 / 100); + _sizeWarn = sizeWarn; + // // Use Ice.ProgramName as the prefix for the thread names. // @@ -98,7 +102,7 @@ public final class ThreadPool java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); - String s = "cannot create threads for thread pool:\n" + sw.toString(); + String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); _instance.logger().error(s); destroy(); @@ -112,6 +116,8 @@ public final class ThreadPool throws Throwable { assert(_destroyed); + assert(_inUse == 0); + if(_selector != null) { try @@ -207,7 +213,7 @@ public final class ThreadPool public void promoteFollower() { - if(_size > 1) + if(_sizeMax > 1) { if(!_promote) // Double-checked locking. { @@ -352,299 +358,315 @@ public final class ThreadPool // // Each thread supplies a BasicStream, to avoid creating excessive - // garbage (Java only) + // garbage (Java only). // private void run(BasicStream stream) { + if(_sizeMax > 1) + { + synchronized(_promoteMonitor) + { + while(!_promote) + { + try + { + _promoteMonitor.wait(); + } + catch(InterruptedException ex) + { + } + } + + _promote = false; + } + + if(TRACE_THREAD) + { + trace("thread " + Thread.currentThread() + " has the lock"); + } + } + while(true) { - if(_size > 1) - { - synchronized(_promoteMonitor) + if(TRACE_REGISTRATION) + { + java.util.Set keys = _selector.keys(); + trace("selecting on " + keys.size() + " channels:"); + java.util.Iterator i = keys.iterator(); + while(i.hasNext()) { - while(!_promote) + java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next(); + trace(" " + key.channel()); + } + } + + select(); + if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. + { + if(TRACE_SELECT) + { + trace("timeout"); + } + + assert(_timeout > 0); + _timeout = 0; + initiateShutdown(); + continue; + } + + EventHandler handler = null; + boolean finished = false; + boolean shutdown = false; + + synchronized(this) + { + if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) + { + if(TRACE_SELECT || TRACE_INTERRUPT) { - try - { - _promoteMonitor.wait(); - } - catch(InterruptedException ex) + trace("detected interrupt"); + } + + // + // There are three possibilities for an interrupt: + // + // - The thread pool has been destroyed. + // + // - Server shutdown has been initiated. + // + // - An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if(_destroyed) + { + if(TRACE_SHUTDOWN) { + trace("destroyed, thread id = " + Thread.currentThread()); } - } - _promote = false; - } - - if(TRACE_THREAD) - { - trace("thread " + Thread.currentThread() + " has the lock"); - } - } - - repeatSelect: - - while(true) - { - if(TRACE_REGISTRATION) - { - java.util.Set keys = _selector.keys(); - trace("selecting on " + keys.size() + " channels:"); - java.util.Iterator i = keys.iterator(); - while(i.hasNext()) - { - java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next(); - trace(" " + key.channel()); - } - } - - select(); - if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. - { - if(TRACE_SELECT) - { - trace("timeout"); - } - - assert(_timeout > 0); - _timeout = 0; - initiateShutdown(); - continue repeatSelect; - } - - EventHandler handler = null; - boolean finished = false; - boolean shutdown = false; - - synchronized(this) - { - if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) - { - if(TRACE_SELECT || TRACE_INTERRUPT) - { - trace("detected interrupt"); - } - - // - // There are three possibilities for an interrupt: - // - // - The thread pool has been destroyed. - // - // - Server shutdown has been initiated. - // - // - An event handler was registered or unregistered. - // - - // - // Thread pool destroyed? - // - if(_destroyed) - { - if(TRACE_SHUTDOWN) - { - trace("destroyed, thread id = " + Thread.currentThread()); - } - - // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. - // - return; - } + // + // Don't clear the interrupt fd if destroyed, + // so that the other threads exit as well. + // + return; + } - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); + // + // Remove the interrupt channel from the selected key set. + // + _keys.remove(_fdIntrReadKey); - shutdown = clearInterrupt(); + shutdown = clearInterrupt(); - if(!shutdown) - { - // - // An event handler must have been - // registered or unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + if(!shutdown) + { + // + // An event handler must have been registered + // or unregistered. + // + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); - if(change.handler != null) // Addition if handler is set. + if(change.handler != null) // Addition if handler is set. + { + int op; + if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else - { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } - - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } + op = java.nio.channels.SelectionKey.OP_READ; + } + else + { + op = java.nio.channels.SelectionKey.OP_ACCEPT; + } - continue repeatSelect; + java.nio.channels.SelectionKey key = null; + try + { + key = change.fd.register(_selector, op, change.handler); } - else // Removal if handler is not set. + catch(java.nio.channels.ClosedChannelException ex) { - HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); + assert(false); + } + _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + - change.fd); - } + if(TRACE_REGISTRATION) + { + trace("added handler (" + change.handler.getClass().getName() + ") for fd " + + change.fd); + } - // Don't goto repeatSelect; we have to - // call finished() on the event - // handler below, outside the thread - // synchronization. + continue; + } + else // Removal if handler is not set. + { + HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); + assert(pair != null); + handler = pair.handler; + finished = true; + pair.key.cancel(); + + if(TRACE_REGISTRATION) + { + trace("removed handler (" + handler.getClass().getName() + ") for fd " + + change.fd); } + + // Don't continue; we have to call + // finished() on the event handler below, + // outside the thread synchronization. } } - else - { - java.nio.channels.SelectionKey key = null; - java.util.Iterator iter = _keys.iterator(); - while(iter.hasNext()) - { - // - // Ignore selection keys that have been cancelled - // - java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next(); - iter.remove(); - if(k.isValid() && key != _fdIntrReadKey) - { - if(TRACE_SELECT) - { - trace("found a key: " + keyToString(k)); - } - - key = k; - break; - } - } - - if(key == null) - { - if(TRACE_SELECT) - { - trace("didn't find a valid key"); - } - - continue repeatSelect; - } - - handler = (EventHandler)key.attachment(); - } - } - - assert(handler != null || shutdown); - - if(shutdown) // Shutdown has been initiated. - { - if(TRACE_SHUTDOWN) - { - trace("shutdown detected"); - } - - ObjectAdapterFactory factory = _instance.objectAdapterFactory(); - if(factory == null) - { - continue repeatSelect; - } - - promoteFollower(); - factory.shutdown(); - } + } else { - if(finished) + java.nio.channels.SelectionKey key = null; + java.util.Iterator iter = _keys.iterator(); + while(iter.hasNext()) { // - // Notify a handler about it's removal from - // the thread pool. + // Ignore selection keys that have been cancelled // - try + java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next(); + iter.remove(); + if(k.isValid() && key != _fdIntrReadKey) { - handler.finished(this); + if(TRACE_SELECT) + { + trace("found a key: " + keyToString(k)); + } + + key = k; + break; } - catch(Ice.LocalException ex) + } + + if(key == null) + { + if(TRACE_SELECT) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception while calling finished():\n" + sw.toString() + "\n" + - handler.toString(); - _instance.logger().error(s); + trace("didn't find a valid key"); } + + continue; } - else + + handler = (EventHandler)key.attachment(); + } + } + + assert(handler != null || shutdown); + + if(shutdown) // Shutdown has been initiated. + { + if(TRACE_SHUTDOWN) + { + trace("shutdown detected"); + } + + ObjectAdapterFactory factory = _instance.objectAdapterFactory(); + if(factory == null) + { + continue; + } + + promoteFollower(); + factory.shutdown(); + } + else + { + if(finished) + { + // + // Notify a handler about it's removal from + // the thread pool. + // + try { - // - // If the handler is "readable", try to read a - // message. - // - try + handler.finished(this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "' while calling finished():\n" + + sw.toString() + "\n" + handler.toString(); + _instance.logger().error(s); + } + } + else + { + // + // If the handler is "readable", try to read a + // message. + // + try + { + if(handler.readable()) { - if(handler.readable()) + try { - try - { - read(handler); - } - catch(Ice.TimeoutException ex) // Expected. + read(handler); + } + catch(Ice.TimeoutException ex) // Expected. + { + continue; + } + catch(Ice.LocalException ex) + { + if(TRACE_EXCEPTION) { - continue repeatSelect; + trace("informing handler (" + handler.getClass().getName() + + ") about exception " + ex); + ex.printStackTrace(); } - catch(Ice.LocalException ex) - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - handler.exception(ex); - continue repeatSelect; - } - - stream.swap(handler._stream); - assert(stream.pos() == stream.size()); + handler.exception(ex); + continue; } + + stream.swap(handler._stream); + assert(stream.pos() == stream.size()); + } - handler.message(stream, this); + handler.message(stream, this); + } + finally + { + stream.reset(); + } + } + } + + if(_sizeMax > 1) + { + synchronized(_promoteMonitor) + { + while(!_promote) + { + try + { + _promoteMonitor.wait(); } - finally + catch(InterruptedException ex) { - stream.reset(); } } - } + + _promote = false; + } - break; // Inner while loop. - } + if(TRACE_THREAD) + { + trace("thread " + Thread.currentThread() + " has the lock"); + } + } } } @@ -893,8 +915,14 @@ public final class ThreadPool private Instance _instance; private boolean _destroyed; - private final int _size; - private final int _sizeMax; + private String _prefix; + + private final int _size; // Number of threads that are pre-created. + private final int _sizeMax; // Maximum number of threads. + private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. + private int _inUse; // Number of threads that are currently in use. + private java.lang.Object _inUseMutex = new java.lang.Object(); + private java.nio.channels.ReadableByteChannel _fdIntrRead; private java.nio.channels.SelectionKey _fdIntrReadKey; private java.nio.channels.WritableByteChannel _fdIntrWrite; @@ -905,7 +933,6 @@ public final class ThreadPool private int _timeout; private boolean _promote; private java.lang.Object _promoteMonitor = new java.lang.Object(); - private String _prefix; private final class EventHandlerThread extends Thread { @@ -929,7 +956,7 @@ public final class ThreadPool java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); - String s = "exception in thread pool thread " + getName() + ":\n" + sw.toString(); + String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.logger().error(s); } catch(RuntimeException ex) @@ -938,7 +965,7 @@ public final class ThreadPool java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); - String s = "unknown exception in thread pool thread " + getName() + ":\n" + sw.toString(); + String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.logger().error(s); } |