diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /java/src/IceInternal/ThreadPool.java | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 1128 |
1 files changed, 358 insertions, 770 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 917c85182de..4494eb9396f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -11,13 +11,62 @@ package IceInternal; public final class ThreadPool { - private final static boolean TRACE_REGISTRATION = false; - private final static boolean TRACE_INTERRUPT = false; - private final static boolean TRACE_SHUTDOWN = false; - private final static boolean TRACE_SELECT = false; - private final static boolean TRACE_EXCEPTION = false; - private final static boolean TRACE_THREAD = false; - private final static boolean TRACE_STACK_TRACE = false; + final class ShutdownWorkItem implements ThreadPoolWorkItem + { + public void execute(ThreadPoolCurrent current) + { + current.ioCompleted(); + try + { + _instance.objectAdapterFactory().shutdown(); + } + catch(Ice.CommunicatorDestroyedException ex) + { + } + } + } + + static final class FinishedWorkItem implements ThreadPoolWorkItem + { + public + FinishedWorkItem(EventHandler handler) + { + _handler = handler; + } + + public void execute(ThreadPoolCurrent current) + { + _handler.finished(current); + } + + private final EventHandler _handler; + } + + static final class JoinThreadWorkItem implements ThreadPoolWorkItem + { + public + JoinThreadWorkItem(EventHandlerThread thread) + { + _thread = thread; + } + + public void execute(ThreadPoolCurrent current) + { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). + _thread.join(); + } + + private final EventHandlerThread _thread; + } + + // + // Exception raised by the thread pool work queue when the thread pool + // is destroyed. + // + static final class DestroyedException extends RuntimeException + { + } public ThreadPool(Instance instance, String prefix, int timeout) @@ -25,32 +74,34 @@ public final class ThreadPool _instance = instance; _destroyed = false; _prefix = prefix; - _timeout = timeout; - _selector = new Selector(instance, timeout); + _selector = new Selector(instance); _threadIndex = 0; - _running = 0; _inUse = 0; - _load = 1.0; + _inUseIO = 0; _promote = true; _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0; - _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; + _serverIdleTime = timeout; + + Ice.Properties properties = _instance.initializationData().properties; - String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); + String programName = properties.getProperty("Ice.ProgramName"); if(programName.length() > 0) { - _programNamePrefix = programName + "-"; + _threadPrefix = programName + "-" + _prefix; } else { - _programNamePrefix = ""; + _threadPrefix = _prefix; } + int nProcessors = Runtime.getRuntime().availableProcessors(); + // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and // doesn't require to make the servants thread safe. // - int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); + int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); if(size < 1) { String s = _prefix + ".Size < 1; Size adjusted to 1"; @@ -58,8 +109,11 @@ public final class ThreadPool size = 1; } - int sizeMax = - _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + if(sizeMax == -1) + { + sizeMax = nProcessors; + } if(sizeMax < size) { String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")"; @@ -67,7 +121,7 @@ public final class ThreadPool sizeMax = size; } - int sizeWarn = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".SizeWarn"); + int sizeWarn = properties.getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100); if(sizeWarn != 0 && sizeWarn < size) { String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")"; @@ -81,28 +135,43 @@ public final class ThreadPool sizeWarn = sizeMax; } + int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); + if(threadIdleTime < 0) + { + String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; + _instance.initializationData().logger.warning(s); + threadIdleTime = 0; + } + _size = size; _sizeMax = sizeMax; _sizeWarn = sizeWarn; + _sizeIO = Math.min(sizeMax, nProcessors); + _threadIdleTime = threadIdleTime; - int stackSize = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".StackSize"); + int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize"); if(stackSize < 0) { String s = _prefix + ".StackSize < 0; Size adjusted to JRE default"; _instance.initializationData().logger.warning(s); stackSize = 0; } - _stackSize = stackSize; - _hasPriority = _instance.initializationData().properties.getProperty(_prefix + ".ThreadPriority") != ""; - _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, _prefix); - if(!_hasPriority) + boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority") != ""; + int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority"); + if(!hasPriority) { - _hasPriority = _instance.initializationData().properties.getProperty("Ice.ThreadPriority") != ""; - _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice"); + hasPriority = properties.getProperty("Ice.ThreadPriority") != ""; + priority = properties.getPropertyAsInt("Ice.ThreadPriority"); } + _hasPriority = hasPriority; + _priority = priority; + + _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); + _nextHandler = _handlers.iterator(); + if(_instance.traceLevels().threadPool >= 1) { String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " + @@ -112,11 +181,9 @@ public final class ThreadPool try { - _threads = new java.util.ArrayList<EventHandlerThread>(); for(int i = 0; i < _size; i++) { - EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threadIndex++); + EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); _threads.add(thread); if(_hasPriority) { @@ -126,16 +193,11 @@ public final class ThreadPool { thread.start(java.lang.Thread.NORM_PRIORITY); } - ++_running; } } catch(RuntimeException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); + String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); destroy(); @@ -154,158 +216,49 @@ public final class ThreadPool public synchronized void destroy() { - if(TRACE_SHUTDOWN) - { - trace("destroy"); - } - assert(!_destroyed); _destroyed = true; - _selector.setInterrupt(); + _workQueue.destroy(); } public synchronized void - _register(EventHandler handler) + initialize(EventHandler handler) { assert(!_destroyed); + _selector.initialize(handler); + } - if(!handler._registered) - { - if(TRACE_REGISTRATION) - { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd()); - } - - if(!handler._serializing) - { - _selector.add(handler, SocketStatus.NeedRead); - } - handler._registered = true; - } + public void + register(EventHandler handler, int op) + { + update(handler, SocketOperation.None, op); } public synchronized void - unregister(EventHandler handler) + update(EventHandler handler, int remove, int add) { assert(!_destroyed); - if(handler._registered) - { - if(TRACE_REGISTRATION) - { - trace("removing handler for channel " + handler.fd()); - } + _selector.update(handler, remove, add); + } - if(!handler._serializing) - { - _selector.remove(handler); - } - handler._registered = false; - } + public void + unregister(EventHandler handler, int op) + { + update(handler, op, SocketOperation.None); } public synchronized void finish(EventHandler handler) { assert(!_destroyed); - - if(TRACE_REGISTRATION) - { - trace("finishing handler for channel " + handler.fd()); - } - - if(handler._registered) - { - if(!handler._serializing) - { - _selector.remove(handler); - } - handler._registered = false; - } - - _finished.add(handler); - _selector.setInterrupt(); - } - - public synchronized void - execute(ThreadPoolWorkItem workItem) - { - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - _workItems.add(workItem); - _selector.setInterrupt(); + _selector.finish(handler); + _workQueue.queue(new FinishedWorkItem(handler)); } public void - promoteFollower(EventHandler handler) + execute(ThreadPoolWorkItem workItem) { - if(_sizeMax > 1) - { - synchronized(this) - { - if(_serialize && handler != null) - { - handler._serializing = true; - if(handler._registered) - { - _selector.remove(handler); - } - } - - assert(!_promote); - _promote = true; - notify(); - - if(!_destroyed) - { - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) - { - String s = "thread pool `" + _prefix + "' is running low on threads\n" - + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; - _instance.initializationData().logger.warning(s); - } - - assert(_inUse <= _running); - if(_inUse < _sizeMax && _inUse == _running) - { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "growing " + _prefix + ": Size = " + (_running + 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } - - try - { - EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threadIndex++); - _threads.add(thread); - if(_hasPriority) - { - thread.start(_priority); - } - else - { - thread.start(java.lang.Thread.NORM_PRIORITY); - } - ++_running; - } - catch(RuntimeException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } - } - } - } - } + _workQueue.queue(workItem); } public void @@ -319,12 +272,13 @@ public final class ThreadPool // for(EventHandlerThread thread : _threads) { - thread.join(true); + thread.join(); } // // Destroy the selector // + _workQueue.close(); _selector.destroy(); } @@ -334,656 +288,308 @@ public final class ThreadPool return _prefix; } - // - // Each thread supplies a BasicStream, to avoid creating excessive - // garbage (Java only). - // - private boolean - run(BasicStream stream) + private void + run(EventHandlerThread thread) { - if(_sizeMax > 1) + ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this); + boolean select = false; + while(true) { - synchronized(this) + if(current._handler != null) { - while(!_promote) + try { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + current._handler.message(current); + } + catch(DestroyedException ex) + { + return; + } + catch(java.lang.Exception ex) + { + String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex); + s += "\nevent handler: " + current._handler.toString(); + _instance.initializationData().logger.error(s); } - - _promote = false; - } - - if(TRACE_THREAD) - { - trace("thread " + Thread.currentThread() + " has the lock"); - } - } - - while(true) - { - try - { - _selector.select(); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; } - - EventHandler handler = null; - ThreadPoolWorkItem workItem = null; - boolean finished = false; - boolean shutdown = false; - - synchronized(this) + else if(select) { - if(_selector.checkTimeout()) + try { - assert(_timeout > 0); - shutdown = true; + _selector.select(_serverIdleTime); } - else if(_selector.isInterrupted()) + catch(Selector.TimeoutException ex) { - if(_selector.processInterrupt()) + synchronized(this) { + if(!_destroyed && _inUse == 0) + { + _workQueue.queue(new ShutdownWorkItem()); // Select timed-out. + } continue; } - - // - // There are three possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. - // - // 2. An event handler is being finished. - // - // 3. A work item has been scheduled. - // - - if(!_finished.isEmpty()) + } + } + + synchronized(this) + { + if(current._handler == null) + { + if(select) { - _selector.clearInterrupt(); - handler = _finished.removeFirst(); - finished = true; + _selector.finishSelect(_handlers, _serverIdleTime); + _nextHandler = _handlers.iterator(); + select = false; } - else if(!_workItems.isEmpty()) + else if(!current._leader && followerWait(thread, current)) { - // - // Work items must be executed first even if the thread pool is destroyed. - // - _selector.clearInterrupt(); - workItem = _workItems.removeFirst(); + return; // Wait timed-out. } - else if(_destroyed) + } + else if(_sizeMax > 1) + { + if(!current._ioCompleted) { // - // Don't clear the interrupt if destroyed, so that the other threads exit as well. + // The handler didn't call ioCompleted() so we take care of decreasing + // the IO thread count now. // - return true; + --_inUseIO; + + if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + { + _selector.hasMoreData(current._handler); + } } else { - assert(false); + // + // If the handler called ioCompleted(), we re-enable the handler in + // case it was disabled and we decrease the number of thread in use. + // + _selector.enable(current._handler, current.operation); + assert(_inUse > 0); + --_inUse; } - } - else - { - handler = (EventHandler)_selector.getNextSelected(); - if(handler == null) + + if(!current._leader && followerWait(thread, current)) { - continue; + return; // Wait timed-out. } } - } - - // - // Now we are outside the thread synchronization. - // - - if(shutdown) - { - // - // Initiate server shutdown. - // - ObjectAdapterFactory factory; - try - { - factory = _instance.objectAdapterFactory(); - } - catch(Ice.CommunicatorDestroyedException e) + else if(!current._ioCompleted && + (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - continue; + _selector.hasMoreData(current._handler); } - promoteFollower(null); - factory.shutdown(); - // - // No "continue", because we want shutdown to be done in - // its own thread from this pool. Therefore we called - // promoteFollower(). + // Get the next ready handler. // - } - else if(workItem != null) - { - try + if(_nextHandler.hasNext()) { - workItem.execute(this); + current._ioCompleted = false; + current._handler = _nextHandler.next(); + current.operation = current._handler._ready; } - catch(Ice.LocalException ex) + else { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString(); - _instance.initializationData().logger.error(s); + current._handler = null; } - // - // No "continue", because we want execute() to - // be called in its own thread from this - // pool. Note that this means that execute() - // must call promoteFollower(). - // - } - else - { - assert(handler != null); - - if(finished) + if(current._handler == null) { // - // Notify a handler about its removal from the thread pool. + // If there are no more ready handlers and there are still threads busy performing + // IO, we give up leadership and promote another follower (which will perform the + // select() only once all the IOs are completed). Otherwise, if there's no more + // threads peforming IOs, it's time to do another select(). // - try + if(_inUseIO > 0) { - handler.finished(this); + promoteFollower(current); } - catch(Ice.LocalException ex) + else { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling finished():\n" + - sw.toString() + "\n" + handler.toString(); - _instance.initializationData().logger.error(s); + _selector.startSelect(); + select = true; } - - // - // No "continue", because we want finished() to be - // called in its own thread from this pool. Note - // that this means that finished() must call - // promoteFollower(). - // } - else + else if(_sizeMax > 1) { // - // If the handler is "readable", try to read a - // message. + // Increment the IO thread count and if there's still threads available + // to perform IO and more handlers ready, we promote a follower. // - try - { - if(handler.readable()) - { - try - { - if(!read(handler)) - { - continue; // Can't read without blocking. - } - - if(handler.hasMoreData()) - { - _selector.hasMoreData(handler); - } - } - catch(Ice.TimeoutException ex) - { - assert(false); // This shouldn't occur as we only perform non-blocking reads. - continue; - } - catch(Ice.DatagramLimitException ex) // Expected. - { - handler._stream.resize(0, true); - continue; - } - catch(Ice.SocketException ex) - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - continue; - } - catch(Ice.LocalException ex) - { - if(handler.datagram()) - { - if(_instance.initializationData().properties.getPropertyAsInt( - "Ice.Warn.Connections") > 0) - { - _instance.initializationData().logger.warning( - "datagram connection exception:\n" + ex + "\n" + handler.toString()); - } - handler._stream.resize(0, true); - } - else - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - } - continue; - } - - stream.swap(handler._stream); - assert(stream.pos() == stream.size()); - } - - // - // Provide a new message to the handler. - // - try - { - handler.message(stream, this); - } - catch(Ice.LocalException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling message():\n" + - sw.toString() + "\n" + handler.toString(); - _instance.initializationData().logger.error(s); - } - - // - // No "continue", because we want message() to - // be called in its own thread from this - // pool. Note that this means that message() - // must call promoteFollower(). - // - } - finally + ++_inUseIO; + if(_nextHandler.hasNext() && _inUseIO < _sizeIO) { - stream.reset(); + promoteFollower(current); } } } + } + } + + void + ioCompleted(ThreadPoolCurrent current) + { + current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. - if(_sizeMax > 1) + if(_sizeMax > 1) + { + synchronized(this) { - synchronized(this) + --_inUseIO; + + if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - if(!_destroyed) - { - if(_serialize && handler != null && handler._serializing) - { - if(handler._registered) - { - _selector.add(handler, SocketStatus.NeedRead); - } - handler._serializing = false; - } + _selector.hasMoreData(current._handler); + } + if(_serialize && !_destroyed) + { + _selector.disable(current._handler, current.operation); + } + + if(current._leader) + { + // + // If this thread is still the leader, it's time to promote a new leader. + // + promoteFollower(current); + } + else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0)) + { + notify(); + } - if(_size < _sizeMax) // Dynamic thread pool + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + String s = "thread pool `" + _prefix + "' is running low on threads\n" + + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; + _instance.initializationData().logger.warning(s); + } + + if(!_destroyed) + { + assert(_inUse <= _threads.size()); + if(_inUse < _sizeMax && _inUse == _threads.size()) + { + if(_instance.traceLevels().threadPool >= 1) { - // - // First we reap threads that have been - // destroyed before. - // - int sz = _threads.size(); - assert(_running <= sz); - if(_running < sz) - { - java.util.Iterator<EventHandlerThread> i = _threads.iterator(); - while(i.hasNext()) - { - EventHandlerThread thread = i.next(); - - if(!thread.isAlive()) - { - if(thread.join(false)) - { - i.remove(); - } - } - } - } + String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } - // - // Now we check if this thread can be destroyed, based - // on a load factor. - // - - // - // The load factor jumps immediately to the number of - // threads that are currently in use, but decays - // exponentially if the number of threads in use is - // smaller than the load factor. This reflects that we - // create threads immediately when they are needed, - // but want the number of threads to slowly decline to - // the configured minimum. - // - double inUse = (double)_inUse; - if(_load < inUse) + try + { + EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); + _threads.add(thread); + if(_hasPriority) { - _load = inUse; + thread.start(_priority); } else { - final double loadFactor = 0.05; // TODO: Configurable? - final double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + _inUse * loadFactor; - } - - if(_running > _size) - { - int load = (int)(_load + 0.5); - - // - // We add one to the load factor because one - // additional thread is needed for select(). - // - if(load + 1 < _running) - { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "shrinking " + _prefix + ": Size = " + (_running - 1); - _instance.initializationData().logger.trace( - _instance.traceLevels().threadPoolCat, s); - } - - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - - return false; - } + thread.start(java.lang.Thread.NORM_PRIORITY); } } - - assert(_inUse > 0); - --_inUse; - } - - // - // Do not wait to be promoted again to release these objects. - // - handler = null; - workItem = null; - - while(!_promote) - { - try - { - wait(); - } - catch(InterruptedException ex) + catch(RuntimeException ex) { + String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); } } - - _promote = false; - } - - if(TRACE_THREAD) - { - trace("thread " + Thread.currentThread() + " has the lock"); } } } - } - - private boolean - read(EventHandler handler) - { - BasicStream stream = handler._stream; - - if(stream.pos() >= Protocol.headerSize) - { - if(!handler.read(stream)) - { - return false; - } - assert(stream.pos() == stream.size()); - return true; - } - - if(stream.size() == 0) + else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - stream.resize(Protocol.headerSize, true); - stream.pos(0); - } - - if(stream.pos() != stream.size()) - { - if(!handler.read(stream)) + synchronized(this) { - return false; + _selector.hasMoreData(current._handler); } - assert(stream.pos() == stream.size()); - } - - int pos = stream.pos(); - if(pos < Protocol.headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw new Ice.IllegalMessageSizeException(); - } - stream.pos(0); - byte[] m = new byte[4]; - m[0] = stream.readByte(); - m[1] = stream.readByte(); - m[2] = stream.readByte(); - m[3] = stream.readByte(); - if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] - || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) - { - Ice.BadMagicException ex = new Ice.BadMagicException(); - ex.badMagic = m; - throw ex; } - - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor) - { - Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = Protocol.protocolMajor; - e.minor = Protocol.protocolMinor; - throw e; - } - - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor) - { - Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = Protocol.encodingMajor; - e.minor = Protocol.encodingMinor; - throw e; - } - - stream.readByte(); // messageType - stream.readByte(); // compress - int size = stream.readInt(); - if(size < Protocol.headerSize) - { - throw new Ice.IllegalMessageSizeException(); - } - if(size > _instance.messageSizeMax()) - { - Ex.throwMemoryLimitException(size, _instance.messageSizeMax()); - } - if(size > stream.size()) - { - stream.resize(size, true); - } - stream.pos(pos); - - if(stream.pos() != stream.size()) + } + + private synchronized void + promoteFollower(ThreadPoolCurrent current) + { + assert(!_promote && current._leader); + _promote = true; + if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0)) { - if(handler.datagram()) - { - if(_warnUdp) - { - _instance.initializationData().logger.warning("DatagramLimitException: maximum size of " - + stream.pos() + " exceeded"); - } - throw new Ice.DatagramLimitException(); - } - else - { - if(!handler.read(stream)) - { - return false; - } - assert(stream.pos() == stream.size()); - } + notify(); } - - return true; + current._leader = false; } -/* - * Commented out because it is unused. - * - private void - selectNonBlocking() + private synchronized boolean + followerWait(EventHandlerThread thread, ThreadPoolCurrent current) { - while(true) + assert(!current._leader); + + // + // It's important to clear the handler before waiting to make sure that + // resources for the handler are released now if it's finished. We also + // clear the per-thread stream. + // + current._handler = null; + current.stream.reset(); + + // + // Wait to be promoted and for all the IO threads to be done. + // + while(!_promote || _inUseIO == _sizeIO || _nextHandler.hasNext() && _inUseIO > 0) { - try + while(true) { - if(TRACE_SELECT) - { - trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " + - Thread.currentThread()); - } - - _selector.selectNow(); - - if(TRACE_SELECT) + try { - if(_keys.size() > 0) + if(_threadIdleTime > 0) { - trace("after selectNow, there are " + _keys.size() + " selected keys:"); - for(java.nio.channels.SelectionKey key : _keys) + long before = IceInternal.Time.currentMonotonicTimeMillis(); + wait(_threadIdleTime * 1000); + if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000) { - trace(" " + keyToString(key)); + if(_instance.traceLevels().threadPool >= 1) + { + String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } + assert(_threads.size() > 1); // Can only be called by a waiting follower thread. + _threads.remove(thread); + _workQueue.queue(new JoinThreadWorkItem(thread)); + return true; } } - } + else + { + wait(); + } - break; - } - catch(java.io.InterruptedIOException ex) - { - continue; - } - catch(java.io.IOException ex) - { - // - // Pressing Ctrl-C causes select() to raise an - // IOException, which seems like a JDK bug. We trap - // for that special case here and ignore it. - // Hopefully we're not masking something important! - // - if(ex.getMessage().indexOf("Interrupted system call") != -1) + break; + } + catch(InterruptedException ex) { - continue; } - - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; } } - } -*/ - - private void - trace(String msg) - { - System.err.println(_prefix + ": " + msg); - } - - private String - keyToString(java.nio.channels.SelectionKey key) - { - String ops = "["; - if(key.isAcceptable()) - { - ops += " OP_ACCEPT"; - } - if(key.isReadable()) - { - ops += " OP_READ"; - } - if(key.isConnectable()) - { - ops += " OP_CONNECT"; - } - if(key.isWritable()) - { - ops += " OP_WRITE"; - } - ops += " ]"; - return key.channel() + " " + ops; + current._leader = true; // The current thread has become the leader. + _promote = false; + return false; } - private Instance _instance; + private final Instance _instance; + private final ThreadPoolWorkQueue _workQueue; private boolean _destroyed; private final String _prefix; - private final String _programNamePrefix; + private final String _threadPrefix; private final Selector _selector; - private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>(); - private int _timeout; private final class EventHandlerThread implements Runnable { @@ -992,28 +598,18 @@ public final class ThreadPool _name = name; } - public boolean - isAlive() - { - return _thread.isAlive(); - } - - public boolean - join(boolean wait) + public void + join() { while(true) { try { _thread.join(); - return true; + break; } catch(InterruptedException ex) { - if(!wait) - { - return false; - } } } } @@ -1031,73 +627,65 @@ public final class ThreadPool { if(_instance.initializationData().threadHook != null) { - _instance.initializationData().threadHook.start(); + try + { + _instance.initializationData().threadHook.start(); + } + catch(java.lang.Exception ex) + { + String s = "thread hook start() method raised an unexpected exception in `"; + s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); + } } - BasicStream stream = new BasicStream(_instance); - - boolean promote; - try { - promote = ThreadPool.this.run(stream); + ThreadPool.this.run(this); } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + sw.toString(); + String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); - promote = true; } - if(promote && _sizeMax > 1) + if(_instance.initializationData().threadHook != null) { - // - // Promote a follower, but w/o modifying _inUse or - // creating new threads. - // - synchronized(ThreadPool.this) + try { - assert(!_promote); - _promote = true; - ThreadPool.this.notify(); + _instance.initializationData().threadHook.stop(); + } + catch(java.lang.Exception ex) + { + String s = "thread hook stop() method raised an unexpected exception in `"; + s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); } - } - - if(TRACE_THREAD) - { - trace("run() terminated"); - } - - if(_instance.initializationData().threadHook != null) - { - _instance.initializationData().threadHook.stop(); } } - private String _name; + final private String _name; private Thread _thread; } private final int _size; // Number of threads that are pre-created. + private final int _sizeIO; // Number of threads that can concurrently perform IO. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. private final boolean _serialize; // True if requests need to be serialized over the connection. - + private final int _priority; + private final boolean _hasPriority; + private final long _serverIdleTime; + private final long _threadIdleTime; private final int _stackSize; - private java.util.List<EventHandlerThread> _threads; // All threads, running or not. + private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>(); private int _threadIndex; // For assigning thread names. - private int _running; // Number of running threads. private int _inUse; // Number of threads that are currently in use. - private double _load; // Current load in number of threads. + private int _inUseIO; // Number of threads that are currently performing IO. - private boolean _promote; + private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>(); + private java.util.Iterator<EventHandler> _nextHandler; - private final boolean _warnUdp; - private int _priority; - private boolean _hasPriority = false; + private boolean _promote; } |