// ********************************************************************** // // Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** package IceInternal; public final class ThreadPool { private final static boolean TRACE_REGISTRATION = false; private final static boolean TRACE_INTERRUPT = false; private final static boolean TRACE_SHUTDOWN = false; private final static boolean TRACE_SELECT = false; private final static boolean TRACE_EXCEPTION = false; private final static boolean TRACE_THREAD = false; private final static boolean TRACE_STACK_TRACE = false; public ThreadPool(Instance instance, String prefix, int timeout) { _instance = instance; _destroyed = false; _prefix = prefix; _timeout = timeout; _threadIndex = 0; _running = 0; _inUse = 0; _load = 1.0; _promote = true; _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; // // If we are in thread per connection mode, no thread pool should // ever be created. // assert(!_instance.threadPerConnection()); String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); if(programName.length() > 0) { _programNamePrefix = programName + "-"; } else { _programNamePrefix = ""; } Network.SocketPair pair = Network.createPipe(); _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; _fdIntrWrite = pair.sink; try { _selector = java.nio.channels.Selector.open(); pair.source.configureBlocking(false); _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); } catch(java.io.IOException ex) { Ice.SyscallException sys = new Ice.SyscallException(); sys.initCause(ex); throw sys; } // // The Selector holds a Set representing the selected keys. The // Set reference doesn't change, so we obtain it once here. // _keys = _selector.selectedKeys(); // // We use just one thread as the default. This is the fastest // psossible setting, still allows one level of nesting, and // doesn't require to make the servants thread safe. // int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); if(size < 1) { size = 1; } int sizeMax = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); if(sizeMax < size) { sizeMax = size; } int sizeWarn = _instance.initializationData().properties.getPropertyAsIntWithDefault( _prefix + ".SizeWarn", sizeMax * 80 / 100); _size = size; _sizeMax = sizeMax; _sizeWarn = sizeWarn; try { _threads = new java.util.ArrayList(); for(int i = 0; i < _size; i++) { EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + _threadIndex++); _threads.add(thread); thread.start(); ++_running; } } catch(RuntimeException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); _instance.initializationData().logger.error(s); destroy(); joinWithAllThreads(); throw ex; } } protected synchronized void finalize() throws Throwable { IceUtil.Assert.FinalizerAssert(_destroyed); } public synchronized void destroy() { if(TRACE_SHUTDOWN) { trace("destroy"); } assert(!_destroyed); assert(_handlerMap.isEmpty()); assert(_changes.isEmpty()); _destroyed = true; setInterrupt(); } public synchronized void _register(java.nio.channels.SelectableChannel fd, EventHandler handler) { if(TRACE_REGISTRATION) { trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd); } assert(!_destroyed); _changes.add(new FdHandlerPair(fd, handler)); setInterrupt(); } public synchronized void unregister(java.nio.channels.SelectableChannel fd) { if(TRACE_REGISTRATION) { if(TRACE_STACK_TRACE) { java.io.StringWriter sw = new java.io.StringWriter(); try { throw new RuntimeException(); } catch(RuntimeException ex) { java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); } trace("removing handler for channel " + fd + "\n" + sw.toString()); } else { trace("removing handler for channel " + fd); } } assert(!_destroyed); _changes.add(new FdHandlerPair(fd, null)); setInterrupt(); } public void promoteFollower() { if(_sizeMax > 1) { synchronized(this) { assert(!_promote); _promote = true; notify(); if(!_destroyed) { assert(_inUse >= 0); ++_inUse; if(_inUse == _sizeWarn) { String s = "thread pool `" + _prefix + "' is running low on threads\n" + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; _instance.initializationData().logger.warning(s); } assert(_inUse <= _running); if(_inUse < _sizeMax && _inUse == _running) { try { EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + _threadIndex++); _threads.add(thread); thread.start(); ++_running; } catch(RuntimeException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); _instance.initializationData().logger.error(s); } } } } } } public void joinWithAllThreads() { // // _threads is immutable after destroy() has been called, // therefore no synchronization is needed. (Synchronization // wouldn't be possible here anyway, because otherwise the // other threads would never terminate.) // java.util.Iterator i = _threads.iterator(); while(i.hasNext()) { EventHandlerThread thread = (EventHandlerThread)i.next(); while(true) { try { thread.join(); break; } catch(InterruptedException ex) { } } } // // Cleanup the selector, and the socket pair. // try { if(_selector != null) { try { _selector.close(); } catch(java.io.IOException ex) { // // BUGFIX: // // Ignore this exception. This shouldn't happen // but for some reasons the close() call raises // "java.io.IOException: Bad file descriptor" on // Mac OS X 10.3.x (it works fine on OS X 10.4.x) // } _selector = null; } if(_fdIntrWrite != null) { try { _fdIntrWrite.close(); } catch(java.io.IOException ex) { // // BUGFIX: // // Ignore this exception. This shouldn't happen // but for some reasons the close() call raises // "java.io.IOException: No such file or // directory" under Linux with JDK 1.4.2. // } _fdIntrWrite = null; } if(_fdIntrRead != null) { _fdIntrRead.close(); _fdIntrRead = null; } } catch(java.io.IOException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString(); _instance.initializationData().logger.error(s); } } public String prefix() { return _prefix; } private void clearInterrupt() { if(TRACE_INTERRUPT) { trace("clearInterrupt"); if(TRACE_STACK_TRACE) { try { throw new RuntimeException(); } catch(RuntimeException ex) { ex.printStackTrace(); } } } byte b = 0; java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); try { while(true) { buf.rewind(); if(_fdIntrRead.read(buf) != 1) { break; } if(TRACE_INTERRUPT) { trace("clearInterrupt got byte " + (int)buf.get(0)); } b = buf.get(0); break; } } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); throw se; } } private void setInterrupt() { if(TRACE_INTERRUPT) { trace("setInterrupt()"); if(TRACE_STACK_TRACE) { try { throw new RuntimeException(); } catch(RuntimeException ex) { ex.printStackTrace(); } } } java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); buf.put(0, (byte)0); while(buf.hasRemaining()) { try { _fdIntrWrite.write(buf); } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); throw se; } } } // // Each thread supplies a BasicStream, to avoid creating excessive // garbage (Java only). // private boolean run(BasicStream stream) { if(_sizeMax > 1) { synchronized(this) { while(!_promote) { try { wait(); } catch(InterruptedException ex) { } } _promote = false; } if(TRACE_THREAD) { trace("thread " + Thread.currentThread() + " has the lock"); } } while(true) { if(TRACE_REGISTRATION) { java.util.Set keys = _selector.keys(); trace("selecting on " + keys.size() + " channels:"); java.util.Iterator i = keys.iterator(); while(i.hasNext()) { java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next(); trace(" " + key.channel()); } } EventHandler handler = null; // // Only call select() if there are no pending handlers with additional data // for us to read. // if(!_pendingHandlers.isEmpty()) { handler = (EventHandler)_pendingHandlers.removeFirst(); } else { select(); } boolean finished = false; boolean shutdown = false; if(handler == null) { synchronized(this) { if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. { if(TRACE_SELECT) { trace("timeout"); } assert(_timeout > 0); _timeout = 0; shutdown = true; } else { if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) { if(TRACE_SELECT || TRACE_INTERRUPT) { trace("detected interrupt"); } // // There are two possiblities for an interrupt: // // 1. The thread pool has been destroyed. // // 2. An event handler was registered or unregistered. // // // Thread pool destroyed? // if(_destroyed) { if(TRACE_SHUTDOWN) { trace("destroyed, thread id = " + Thread.currentThread()); } // // Don't clear the interrupt fd if // destroyed, so that the other threads // exit as well. // return true; } // // Remove the interrupt channel from the // selected key set. // _keys.remove(_fdIntrReadKey); clearInterrupt(); // // An event handler must have been registered // or unregistered. // assert(!_changes.isEmpty()); FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); if(change.handler != null) // Addition if handler is set. { int op; if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { op = java.nio.channels.SelectionKey.OP_READ; } else { op = java.nio.channels.SelectionKey.OP_ACCEPT; } java.nio.channels.SelectionKey key = null; try { key = change.fd.register(_selector, op, change.handler); } catch(java.nio.channels.ClosedChannelException ex) { assert(false); } _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); if(TRACE_REGISTRATION) { trace("added handler (" + change.handler.getClass().getName() + ") for fd " + change.fd); } continue; } else // Removal if handler is not set. { HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); assert(pair != null); handler = pair.handler; finished = true; pair.key.cancel(); if(TRACE_REGISTRATION) { trace("removed handler (" + handler.getClass().getName() + ") for fd " + change.fd); } // Don't continue; we have to call // finished() on the event handler below, // outside the thread synchronization. } } else { java.nio.channels.SelectionKey key = null; java.util.Iterator iter = _keys.iterator(); while(iter.hasNext()) { // // Ignore selection keys that have been cancelled // java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next(); iter.remove(); if(k.isValid() && key != _fdIntrReadKey) { if(TRACE_SELECT) { trace("found a key: " + keyToString(k)); } key = k; break; } } if(key == null) { if(TRACE_SELECT) { trace("didn't find a valid key"); } continue; } handler = (EventHandler)key.attachment(); } } } } // // Now we are outside the thread synchronization. // if(shutdown) { if(TRACE_SHUTDOWN) { trace("shutdown detected"); } // // Initiate server shutdown. // ObjectAdapterFactory factory; try { factory = _instance.objectAdapterFactory(); } catch(Ice.CommunicatorDestroyedException e) { continue; } promoteFollower(); factory.shutdown(); // // No "continue", because we want shutdown to be done in // its own thread from this pool. Therefore we called // promoteFollower(). // } else { assert(handler != null); if(finished) { // // Notify a handler about its removal from // the thread pool. // try { handler.finished(this); } catch(Ice.LocalException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "' while calling finished():\n" + sw.toString() + "\n" + handler.toString(); _instance.initializationData().logger.error(s); } // // No "continue", because we want finished() to be // called in its own thread from this pool. Note // that this means that finished() must call // promoteFollower(). // } else { // // If the handler is "readable", try to read a // message. // try { if(handler.readable()) { try { // // If read returns true, the handler has more data for the thread pool // to process. // if(read(handler)) { _pendingHandlers.add(handler); } } catch(Ice.TimeoutException ex) // Expected. { continue; } catch(Ice.DatagramLimitException ex) // Expected. { continue; } catch(Ice.SocketException ex) { if(TRACE_EXCEPTION) { trace("informing handler (" + handler.getClass().getName() + ") about exception " + ex); ex.printStackTrace(); } handler.exception(ex); continue; } catch(Ice.LocalException ex) { if(handler.datagram()) { if(_instance.initializationData().properties.getPropertyAsInt( "Ice.Warn.Connections") > 0) { _instance.initializationData().logger.warning( "datagram connection exception:\n" + ex + "\n" + handler.toString()); } } else { if(TRACE_EXCEPTION) { trace("informing handler (" + handler.getClass().getName() + ") about exception " + ex); ex.printStackTrace(); } handler.exception(ex); } continue; } stream.swap(handler._stream); assert(stream.pos() == stream.size()); } // // Provide a new message to the handler. // try { handler.message(stream, this); } catch(Ice.LocalException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "' while calling message():\n" + sw.toString() + "\n" + handler.toString(); _instance.initializationData().logger.error(s); } // // No "continue", because we want message() to // be called in its own thread from this // pool. Note that this means that message() // must call promoteFollower(). // } finally { stream.reset(); } } } if(_sizeMax > 1) { synchronized(this) { if(!_destroyed) { // // First we reap threads that have been // destroyed before. // int sz = _threads.size(); assert(_running <= sz); if(_running < sz) { java.util.Iterator i = _threads.iterator(); while(i.hasNext()) { EventHandlerThread thread = (EventHandlerThread)i.next(); if(!thread.isAlive()) { try { thread.join(); i.remove(); } catch(InterruptedException ex) { } } } } // // Now we check if this thread can be destroyed, based // on a load factor. // // // The load factor jumps immediately to the number of // threads that are currently in use, but decays // exponentially if the number of threads in use is // smaller than the load factor. This reflects that we // create threads immediately when they are needed, // but want the number of threads to slowly decline to // the configured minimum. // double inUse = (double)_inUse; if(_load < inUse) { _load = inUse; } else { final double loadFactor = 0.05; // TODO: Configurable? final double oneMinusLoadFactor = 1 - loadFactor; _load = _load * oneMinusLoadFactor + _inUse * loadFactor; } if(_running > _size) { int load = (int)(_load + 0.5); // // We add one to the load factor because one // additional thread is needed for select(). // if(load + 1 < _running) { assert(_inUse > 0); --_inUse; assert(_running > 0); --_running; return false; } } assert(_inUse > 0); --_inUse; } while(!_promote) { try { wait(); } catch(InterruptedException ex) { } } _promote = false; } if(TRACE_THREAD) { trace("thread " + Thread.currentThread() + " has the lock"); } } } } private boolean read(EventHandler handler) { boolean moreData = false; BasicStream stream = handler._stream; if(stream.size() == 0) { stream.resize(Protocol.headerSize, true); stream.pos(0); } if(stream.pos() != stream.size()) { moreData = handler.read(stream); assert(stream.pos() == stream.size()); } int pos = stream.pos(); if(pos < Protocol.headerSize) { // // This situation is possible for small UDP packets. // throw new Ice.IllegalMessageSizeException(); } stream.pos(0); byte[] m = new byte[4]; m[0] = stream.readByte(); m[1] = stream.readByte(); m[2] = stream.readByte(); m[3] = stream.readByte(); if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) { Ice.BadMagicException ex = new Ice.BadMagicException(); ex.badMagic = m; throw ex; } byte pMajor = stream.readByte(); byte pMinor = stream.readByte(); if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor) { Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; e.major = Protocol.protocolMajor; e.minor = Protocol.protocolMinor; throw e; } byte eMajor = stream.readByte(); byte eMinor = stream.readByte(); if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor) { Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; e.major = Protocol.encodingMajor; e.minor = Protocol.encodingMinor; throw e; } byte messageType = stream.readByte(); byte compress = stream.readByte(); int size = stream.readInt(); if(size < Protocol.headerSize) { throw new Ice.IllegalMessageSizeException(); } if(size > _instance.messageSizeMax()) { throw new Ice.MemoryLimitException(); } if(size > stream.size()) { stream.resize(size, true); } stream.pos(pos); if(stream.pos() != stream.size()) { if(handler.datagram()) { if(_warnUdp) { _instance.initializationData().logger.warning("DatagramLimitException: maximum size of " + stream.pos() + " exceeded"); } stream.pos(0); stream.resize(0, true); throw new Ice.DatagramLimitException(); } else { moreData = handler.read(stream); assert(stream.pos() == stream.size()); } } return moreData; } /* * Commented out because it is unused. * private void selectNonBlocking() { while(true) { try { if(TRACE_SELECT) { trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread()); } _selector.selectNow(); if(TRACE_SELECT) { if(_keys.size() > 0) { trace("after selectNow, there are " + _keys.size() + " selected keys:"); java.util.Iterator i = _keys.iterator(); while(i.hasNext()) { java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next(); trace(" " + keyToString(key)); } } } break; } catch(java.io.InterruptedIOException ex) { continue; } catch(java.io.IOException ex) { // // Pressing Ctrl-C causes select() to raise an // IOException, which seems like a JDK bug. We trap // for that special case here and ignore it. // Hopefully we're not masking something important! // if(ex.getMessage().indexOf("Interrupted system call") != -1) { continue; } Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); //throw se; java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); se.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "':\n" + sw.toString(); _instance.initializationData().logger.error(s); continue; } } } */ private void select() { int ret = 0; while(true) { try { if(TRACE_SELECT) { trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread()); } if(_timeout > 0) { ret = _selector.select(_timeout * 1000); } else { ret = _selector.select(); } } catch(java.io.InterruptedIOException ex) { continue; } catch(java.io.IOException ex) { // // Pressing Ctrl-C causes select() to raise an // IOException, which seems like a JDK bug. We trap // for that special case here and ignore it. // Hopefully we're not masking something important! // if(ex.getMessage().indexOf("Interrupted system call") != -1) { continue; } Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); //throw se; java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); se.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "':\n" + sw.toString(); _instance.initializationData().logger.error(s); continue; } if(TRACE_SELECT) { trace("select() returned " + ret + ", _keys.size() = " + _keys.size()); } break; } } private void trace(String msg) { System.err.println(_prefix + ": " + msg); } private String keyToString(java.nio.channels.SelectionKey key) { String ops = "["; if(key.isAcceptable()) { ops += " OP_ACCEPT"; } if(key.isReadable()) { ops += " OP_READ"; } if(key.isConnectable()) { ops += " OP_CONNECT"; } if(key.isWritable()) { ops += " OP_WRITE"; } ops += " ]"; return key.channel() + " " + ops; } private static final class FdHandlerPair { java.nio.channels.SelectableChannel fd; EventHandler handler; FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler) { this.fd = fd; this.handler = handler; } } private static final class HandlerKeyPair { EventHandler handler; java.nio.channels.SelectionKey key; HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key) { this.handler = handler; this.key = key; } } private Instance _instance; private boolean _destroyed; private final String _prefix; private final String _programNamePrefix; private java.nio.channels.ReadableByteChannel _fdIntrRead; private java.nio.channels.SelectionKey _fdIntrReadKey; private java.nio.channels.WritableByteChannel _fdIntrWrite; private java.nio.channels.Selector _selector; private java.util.Set _keys; private java.util.LinkedList _changes = new java.util.LinkedList(); private java.util.HashMap _handlerMap = new java.util.HashMap(); private int _timeout; // // Since the Java5 SSL transceiver can read more data from the socket than is // actually requested, we have to keep a separate list of handlers that need // the thread pool to read more data before it re-enters a blocking call to // select(). // private java.util.LinkedList _pendingHandlers = new java.util.LinkedList(); private final class EventHandlerThread extends Thread { EventHandlerThread(String name) { super(name); } public void run() { if(_instance.initializationData().threadHook != null) { _instance.initializationData().threadHook.start(); } BasicStream stream = new BasicStream(_instance); boolean promote; try { promote = ThreadPool.this.run(stream); } catch(Ice.LocalException ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.initializationData().logger.error(s); promote = true; } catch(java.lang.Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString(); _instance.initializationData().logger.error(s); promote = true; } if(promote && _sizeMax > 1) { // // Promote a follower, but w/o modifying _inUse or // creating new threads. // synchronized(ThreadPool.this) { assert(!_promote); _promote = true; ThreadPool.this.notify(); } } if(TRACE_THREAD) { trace("run() terminated"); } if(_instance.initializationData().threadHook != null) { _instance.initializationData().threadHook.stop(); } } } private final int _size; // Number of threads that are pre-created. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. private java.util.ArrayList _threads; // All threads, running or not. private int _threadIndex; // For assigning thread names. private int _running; // Number of running threads. private int _inUse; // Number of threads that are currently in use. private double _load; // Current load in number of threads. private boolean _promote; private final boolean _warnUdp; }