summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /java/src/IceInternal/ThreadPool.java
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java1128
1 files changed, 358 insertions, 770 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 917c85182de..4494eb9396f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -11,13 +11,62 @@ package IceInternal;
public final class ThreadPool
{
- private final static boolean TRACE_REGISTRATION = false;
- private final static boolean TRACE_INTERRUPT = false;
- private final static boolean TRACE_SHUTDOWN = false;
- private final static boolean TRACE_SELECT = false;
- private final static boolean TRACE_EXCEPTION = false;
- private final static boolean TRACE_THREAD = false;
- private final static boolean TRACE_STACK_TRACE = false;
+ final class ShutdownWorkItem implements ThreadPoolWorkItem
+ {
+ public void execute(ThreadPoolCurrent current)
+ {
+ current.ioCompleted();
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException ex)
+ {
+ }
+ }
+ }
+
+ static final class FinishedWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ FinishedWorkItem(EventHandler handler)
+ {
+ _handler = handler;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ _handler.finished(current);
+ }
+
+ private final EventHandler _handler;
+ }
+
+ static final class JoinThreadWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ JoinThreadWorkItem(EventHandlerThread thread)
+ {
+ _thread = thread;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
+ _thread.join();
+ }
+
+ private final EventHandlerThread _thread;
+ }
+
+ //
+ // Exception raised by the thread pool work queue when the thread pool
+ // is destroyed.
+ //
+ static final class DestroyedException extends RuntimeException
+ {
+ }
public
ThreadPool(Instance instance, String prefix, int timeout)
@@ -25,32 +74,34 @@ public final class ThreadPool
_instance = instance;
_destroyed = false;
_prefix = prefix;
- _timeout = timeout;
- _selector = new Selector(instance, timeout);
+ _selector = new Selector(instance);
_threadIndex = 0;
- _running = 0;
_inUse = 0;
- _load = 1.0;
+ _inUseIO = 0;
_promote = true;
_serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
- _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+ _serverIdleTime = timeout;
+
+ Ice.Properties properties = _instance.initializationData().properties;
- String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
+ String programName = properties.getProperty("Ice.ProgramName");
if(programName.length() > 0)
{
- _programNamePrefix = programName + "-";
+ _threadPrefix = programName + "-" + _prefix;
}
else
{
- _programNamePrefix = "";
+ _threadPrefix = _prefix;
}
+ int nProcessors = Runtime.getRuntime().availableProcessors();
+
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
// doesn't require to make the servants thread safe.
//
- int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
if(size < 1)
{
String s = _prefix + ".Size < 1; Size adjusted to 1";
@@ -58,8 +109,11 @@ public final class ThreadPool
size = 1;
}
- int sizeMax =
- _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax == -1)
+ {
+ sizeMax = nProcessors;
+ }
if(sizeMax < size)
{
String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
@@ -67,7 +121,7 @@ public final class ThreadPool
sizeMax = size;
}
- int sizeWarn = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".SizeWarn");
+ int sizeWarn = properties.getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100);
if(sizeWarn != 0 && sizeWarn < size)
{
String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
@@ -81,28 +135,43 @@ public final class ThreadPool
sizeWarn = sizeMax;
}
+ int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ _instance.initializationData().logger.warning(s);
+ threadIdleTime = 0;
+ }
+
_size = size;
_sizeMax = sizeMax;
_sizeWarn = sizeWarn;
+ _sizeIO = Math.min(sizeMax, nProcessors);
+ _threadIdleTime = threadIdleTime;
- int stackSize = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".StackSize");
+ int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize");
if(stackSize < 0)
{
String s = _prefix + ".StackSize < 0; Size adjusted to JRE default";
_instance.initializationData().logger.warning(s);
stackSize = 0;
}
-
_stackSize = stackSize;
- _hasPriority = _instance.initializationData().properties.getProperty(_prefix + ".ThreadPriority") != "";
- _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, _prefix);
- if(!_hasPriority)
+ boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority") != "";
+ int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority");
+ if(!hasPriority)
{
- _hasPriority = _instance.initializationData().properties.getProperty("Ice.ThreadPriority") != "";
- _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice");
+ hasPriority = properties.getProperty("Ice.ThreadPriority") != "";
+ priority = properties.getPropertyAsInt("Ice.ThreadPriority");
}
+ _hasPriority = hasPriority;
+ _priority = priority;
+
+ _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+ _nextHandler = _handlers.iterator();
+
if(_instance.traceLevels().threadPool >= 1)
{
String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
@@ -112,11 +181,9 @@ public final class ThreadPool
try
{
- _threads = new java.util.ArrayList<EventHandlerThread>();
for(int i = 0; i < _size; i++)
{
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
_threads.add(thread);
if(_hasPriority)
{
@@ -126,16 +193,11 @@ public final class ThreadPool
{
thread.start(java.lang.Thread.NORM_PRIORITY);
}
- ++_running;
}
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
destroy();
@@ -154,158 +216,49 @@ public final class ThreadPool
public synchronized void
destroy()
{
- if(TRACE_SHUTDOWN)
- {
- trace("destroy");
- }
-
assert(!_destroyed);
_destroyed = true;
- _selector.setInterrupt();
+ _workQueue.destroy();
}
public synchronized void
- _register(EventHandler handler)
+ initialize(EventHandler handler)
{
assert(!_destroyed);
+ _selector.initialize(handler);
+ }
- if(!handler._registered)
- {
- if(TRACE_REGISTRATION)
- {
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd());
- }
-
- if(!handler._serializing)
- {
- _selector.add(handler, SocketStatus.NeedRead);
- }
- handler._registered = true;
- }
+ public void
+ register(EventHandler handler, int op)
+ {
+ update(handler, SocketOperation.None, op);
}
public synchronized void
- unregister(EventHandler handler)
+ update(EventHandler handler, int remove, int add)
{
assert(!_destroyed);
- if(handler._registered)
- {
- if(TRACE_REGISTRATION)
- {
- trace("removing handler for channel " + handler.fd());
- }
+ _selector.update(handler, remove, add);
+ }
- if(!handler._serializing)
- {
- _selector.remove(handler);
- }
- handler._registered = false;
- }
+ public void
+ unregister(EventHandler handler, int op)
+ {
+ update(handler, op, SocketOperation.None);
}
public synchronized void
finish(EventHandler handler)
{
assert(!_destroyed);
-
- if(TRACE_REGISTRATION)
- {
- trace("finishing handler for channel " + handler.fd());
- }
-
- if(handler._registered)
- {
- if(!handler._serializing)
- {
- _selector.remove(handler);
- }
- handler._registered = false;
- }
-
- _finished.add(handler);
- _selector.setInterrupt();
- }
-
- public synchronized void
- execute(ThreadPoolWorkItem workItem)
- {
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
- _workItems.add(workItem);
- _selector.setInterrupt();
+ _selector.finish(handler);
+ _workQueue.queue(new FinishedWorkItem(handler));
}
public void
- promoteFollower(EventHandler handler)
+ execute(ThreadPoolWorkItem workItem)
{
- if(_sizeMax > 1)
- {
- synchronized(this)
- {
- if(_serialize && handler != null)
- {
- handler._serializing = true;
- if(handler._registered)
- {
- _selector.remove(handler);
- }
- }
-
- assert(!_promote);
- _promote = true;
- notify();
-
- if(!_destroyed)
- {
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.initializationData().logger.warning(s);
- }
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "growing " + _prefix + ": Size = " + (_running + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
- {
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
- _threads.add(thread);
- if(_hasPriority)
- {
- thread.start(_priority);
- }
- else
- {
- thread.start(java.lang.Thread.NORM_PRIORITY);
- }
- ++_running;
- }
- catch(RuntimeException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
- }
+ _workQueue.queue(workItem);
}
public void
@@ -319,12 +272,13 @@ public final class ThreadPool
//
for(EventHandlerThread thread : _threads)
{
- thread.join(true);
+ thread.join();
}
//
// Destroy the selector
//
+ _workQueue.close();
_selector.destroy();
}
@@ -334,656 +288,308 @@ public final class ThreadPool
return _prefix;
}
- //
- // Each thread supplies a BasicStream, to avoid creating excessive
- // garbage (Java only).
- //
- private boolean
- run(BasicStream stream)
+ private void
+ run(EventHandlerThread thread)
{
- if(_sizeMax > 1)
+ ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this);
+ boolean select = false;
+ while(true)
{
- synchronized(this)
+ if(current._handler != null)
{
- while(!_promote)
+ try
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ current._handler.message(current);
+ }
+ catch(DestroyedException ex)
+ {
+ return;
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex);
+ s += "\nevent handler: " + current._handler.toString();
+ _instance.initializationData().logger.error(s);
}
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
- }
- }
-
- while(true)
- {
- try
- {
- _selector.select();
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
}
-
- EventHandler handler = null;
- ThreadPoolWorkItem workItem = null;
- boolean finished = false;
- boolean shutdown = false;
-
- synchronized(this)
+ else if(select)
{
- if(_selector.checkTimeout())
+ try
{
- assert(_timeout > 0);
- shutdown = true;
+ _selector.select(_serverIdleTime);
}
- else if(_selector.isInterrupted())
+ catch(Selector.TimeoutException ex)
{
- if(_selector.processInterrupt())
+ synchronized(this)
{
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
+ }
continue;
}
-
- //
- // There are three possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler is being finished.
- //
- // 3. A work item has been scheduled.
- //
-
- if(!_finished.isEmpty())
+ }
+ }
+
+ synchronized(this)
+ {
+ if(current._handler == null)
+ {
+ if(select)
{
- _selector.clearInterrupt();
- handler = _finished.removeFirst();
- finished = true;
+ _selector.finishSelect(_handlers, _serverIdleTime);
+ _nextHandler = _handlers.iterator();
+ select = false;
}
- else if(!_workItems.isEmpty())
+ else if(!current._leader && followerWait(thread, current))
{
- //
- // Work items must be executed first even if the thread pool is destroyed.
- //
- _selector.clearInterrupt();
- workItem = _workItems.removeFirst();
+ return; // Wait timed-out.
}
- else if(_destroyed)
+ }
+ else if(_sizeMax > 1)
+ {
+ if(!current._ioCompleted)
{
//
- // Don't clear the interrupt if destroyed, so that the other threads exit as well.
+ // The handler didn't call ioCompleted() so we take care of decreasing
+ // the IO thread count now.
//
- return true;
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ _selector.hasMoreData(current._handler);
+ }
}
else
{
- assert(false);
+ //
+ // If the handler called ioCompleted(), we re-enable the handler in
+ // case it was disabled and we decrease the number of thread in use.
+ //
+ _selector.enable(current._handler, current.operation);
+ assert(_inUse > 0);
+ --_inUse;
}
- }
- else
- {
- handler = (EventHandler)_selector.getNextSelected();
- if(handler == null)
+
+ if(!current._leader && followerWait(thread, current))
{
- continue;
+ return; // Wait timed-out.
}
}
- }
-
- //
- // Now we are outside the thread synchronization.
- //
-
- if(shutdown)
- {
- //
- // Initiate server shutdown.
- //
- ObjectAdapterFactory factory;
- try
- {
- factory = _instance.objectAdapterFactory();
- }
- catch(Ice.CommunicatorDestroyedException e)
+ else if(!current._ioCompleted &&
+ (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- continue;
+ _selector.hasMoreData(current._handler);
}
- promoteFollower(null);
- factory.shutdown();
-
//
- // No "continue", because we want shutdown to be done in
- // its own thread from this pool. Therefore we called
- // promoteFollower().
+ // Get the next ready handler.
//
- }
- else if(workItem != null)
- {
- try
+ if(_nextHandler.hasNext())
{
- workItem.execute(this);
+ current._ioCompleted = false;
+ current._handler = _nextHandler.next();
+ current.operation = current._handler._ready;
}
- catch(Ice.LocalException ex)
+ else
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
+ current._handler = null;
}
- //
- // No "continue", because we want execute() to
- // be called in its own thread from this
- // pool. Note that this means that execute()
- // must call promoteFollower().
- //
- }
- else
- {
- assert(handler != null);
-
- if(finished)
+ if(current._handler == null)
{
//
- // Notify a handler about its removal from the thread pool.
+ // If there are no more ready handlers and there are still threads busy performing
+ // IO, we give up leadership and promote another follower (which will perform the
+ // select() only once all the IOs are completed). Otherwise, if there's no more
+ // threads peforming IOs, it's time to do another select().
//
- try
+ if(_inUseIO > 0)
{
- handler.finished(this);
+ promoteFollower(current);
}
- catch(Ice.LocalException ex)
+ else
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling finished():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
+ _selector.startSelect();
+ select = true;
}
-
- //
- // No "continue", because we want finished() to be
- // called in its own thread from this pool. Note
- // that this means that finished() must call
- // promoteFollower().
- //
}
- else
+ else if(_sizeMax > 1)
{
//
- // If the handler is "readable", try to read a
- // message.
+ // Increment the IO thread count and if there's still threads available
+ // to perform IO and more handlers ready, we promote a follower.
//
- try
- {
- if(handler.readable())
- {
- try
- {
- if(!read(handler))
- {
- continue; // Can't read without blocking.
- }
-
- if(handler.hasMoreData())
- {
- _selector.hasMoreData(handler);
- }
- }
- catch(Ice.TimeoutException ex)
- {
- assert(false); // This shouldn't occur as we only perform non-blocking reads.
- continue;
- }
- catch(Ice.DatagramLimitException ex) // Expected.
- {
- handler._stream.resize(0, true);
- continue;
- }
- catch(Ice.SocketException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- continue;
- }
- catch(Ice.LocalException ex)
- {
- if(handler.datagram())
- {
- if(_instance.initializationData().properties.getPropertyAsInt(
- "Ice.Warn.Connections") > 0)
- {
- _instance.initializationData().logger.warning(
- "datagram connection exception:\n" + ex + "\n" + handler.toString());
- }
- handler._stream.resize(0, true);
- }
- else
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- }
- continue;
- }
-
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
- }
-
- //
- // Provide a new message to the handler.
- //
- try
- {
- handler.message(stream, this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling message():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
- }
-
- //
- // No "continue", because we want message() to
- // be called in its own thread from this
- // pool. Note that this means that message()
- // must call promoteFollower().
- //
- }
- finally
+ ++_inUseIO;
+ if(_nextHandler.hasNext() && _inUseIO < _sizeIO)
{
- stream.reset();
+ promoteFollower(current);
}
}
}
+ }
+ }
+
+ void
+ ioCompleted(ThreadPoolCurrent current)
+ {
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
- if(_sizeMax > 1)
+ if(_sizeMax > 1)
+ {
+ synchronized(this)
{
- synchronized(this)
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- if(!_destroyed)
- {
- if(_serialize && handler != null && handler._serializing)
- {
- if(handler._registered)
- {
- _selector.add(handler, SocketStatus.NeedRead);
- }
- handler._serializing = false;
- }
+ _selector.hasMoreData(current._handler);
+ }
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler, current.operation);
+ }
+
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
+ {
+ notify();
+ }
- if(_size < _sizeMax) // Dynamic thread pool
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
+
+ if(!_destroyed)
+ {
+ assert(_inUse <= _threads.size());
+ if(_inUse < _sizeMax && _inUse == _threads.size())
+ {
+ if(_instance.traceLevels().threadPool >= 1)
{
- //
- // First we reap threads that have been
- // destroyed before.
- //
- int sz = _threads.size();
- assert(_running <= sz);
- if(_running < sz)
- {
- java.util.Iterator<EventHandlerThread> i = _threads.iterator();
- while(i.hasNext())
- {
- EventHandlerThread thread = i.next();
-
- if(!thread.isAlive())
- {
- if(thread.join(false))
- {
- i.remove();
- }
- }
- }
- }
+ String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
-
- //
- // The load factor jumps immediately to the number of
- // threads that are currently in use, but decays
- // exponentially if the number of threads in use is
- // smaller than the load factor. This reflects that we
- // create threads immediately when they are needed,
- // but want the number of threads to slowly decline to
- // the configured minimum.
- //
- double inUse = (double)_inUse;
- if(_load < inUse)
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
+ _threads.add(thread);
+ if(_hasPriority)
{
- _load = inUse;
+ thread.start(_priority);
}
else
{
- final double loadFactor = 0.05; // TODO: Configurable?
- final double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
- }
-
- if(_running > _size)
- {
- int load = (int)(_load + 0.5);
-
- //
- // We add one to the load factor because one
- // additional thread is needed for select().
- //
- if(load + 1 < _running)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "shrinking " + _prefix + ": Size = " + (_running - 1);
- _instance.initializationData().logger.trace(
- _instance.traceLevels().threadPoolCat, s);
- }
-
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
+ thread.start(java.lang.Thread.NORM_PRIORITY);
}
}
-
- assert(_inUse > 0);
- --_inUse;
- }
-
- //
- // Do not wait to be promoted again to release these objects.
- //
- handler = null;
- workItem = null;
-
- while(!_promote)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
+ catch(RuntimeException ex)
{
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
}
}
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
}
}
}
- }
-
- private boolean
- read(EventHandler handler)
- {
- BasicStream stream = handler._stream;
-
- if(stream.pos() >= Protocol.headerSize)
- {
- if(!handler.read(stream))
- {
- return false;
- }
- assert(stream.pos() == stream.size());
- return true;
- }
-
- if(stream.size() == 0)
+ else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- stream.resize(Protocol.headerSize, true);
- stream.pos(0);
- }
-
- if(stream.pos() != stream.size())
- {
- if(!handler.read(stream))
+ synchronized(this)
{
- return false;
+ _selector.hasMoreData(current._handler);
}
- assert(stream.pos() == stream.size());
- }
-
- int pos = stream.pos();
- if(pos < Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
- }
- stream.pos(0);
- byte[] m = new byte[4];
- m[0] = stream.readByte();
- m[1] = stream.readByte();
- m[2] = stream.readByte();
- m[3] = stream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
- || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
}
-
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
- {
- Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = Protocol.protocolMajor;
- e.minor = Protocol.protocolMinor;
- throw e;
- }
-
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
- {
- Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = Protocol.encodingMajor;
- e.minor = Protocol.encodingMinor;
- throw e;
- }
-
- stream.readByte(); // messageType
- stream.readByte(); // compress
- int size = stream.readInt();
- if(size < Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- if(size > _instance.messageSizeMax())
- {
- Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
- }
- if(size > stream.size())
- {
- stream.resize(size, true);
- }
- stream.pos(pos);
-
- if(stream.pos() != stream.size())
+ }
+
+ private synchronized void
+ promoteFollower(ThreadPoolCurrent current)
+ {
+ assert(!_promote && current._leader);
+ _promote = true;
+ if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0))
{
- if(handler.datagram())
- {
- if(_warnUdp)
- {
- _instance.initializationData().logger.warning("DatagramLimitException: maximum size of "
- + stream.pos() + " exceeded");
- }
- throw new Ice.DatagramLimitException();
- }
- else
- {
- if(!handler.read(stream))
- {
- return false;
- }
- assert(stream.pos() == stream.size());
- }
+ notify();
}
-
- return true;
+ current._leader = false;
}
-/*
- * Commented out because it is unused.
- *
- private void
- selectNonBlocking()
+ private synchronized boolean
+ followerWait(EventHandlerThread thread, ThreadPoolCurrent current)
{
- while(true)
+ assert(!current._leader);
+
+ //
+ // It's important to clear the handler before waiting to make sure that
+ // resources for the handler are released now if it's finished. We also
+ // clear the per-thread stream.
+ //
+ current._handler = null;
+ current.stream.reset();
+
+ //
+ // Wait to be promoted and for all the IO threads to be done.
+ //
+ while(!_promote || _inUseIO == _sizeIO || _nextHandler.hasNext() && _inUseIO > 0)
{
- try
+ while(true)
{
- if(TRACE_SELECT)
- {
- trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " +
- Thread.currentThread());
- }
-
- _selector.selectNow();
-
- if(TRACE_SELECT)
+ try
{
- if(_keys.size() > 0)
+ if(_threadIdleTime > 0)
{
- trace("after selectNow, there are " + _keys.size() + " selected keys:");
- for(java.nio.channels.SelectionKey key : _keys)
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ wait(_threadIdleTime * 1000);
+ if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
{
- trace(" " + keyToString(key));
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.remove(thread);
+ _workQueue.queue(new JoinThreadWorkItem(thread));
+ return true;
}
}
- }
+ else
+ {
+ wait();
+ }
- break;
- }
- catch(java.io.InterruptedIOException ex)
- {
- continue;
- }
- catch(java.io.IOException ex)
- {
- //
- // Pressing Ctrl-C causes select() to raise an
- // IOException, which seems like a JDK bug. We trap
- // for that special case here and ignore it.
- // Hopefully we're not masking something important!
- //
- if(ex.getMessage().indexOf("Interrupted system call") != -1)
+ break;
+ }
+ catch(InterruptedException ex)
{
- continue;
}
-
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
}
}
- }
-*/
-
- private void
- trace(String msg)
- {
- System.err.println(_prefix + ": " + msg);
- }
-
- private String
- keyToString(java.nio.channels.SelectionKey key)
- {
- String ops = "[";
- if(key.isAcceptable())
- {
- ops += " OP_ACCEPT";
- }
- if(key.isReadable())
- {
- ops += " OP_READ";
- }
- if(key.isConnectable())
- {
- ops += " OP_CONNECT";
- }
- if(key.isWritable())
- {
- ops += " OP_WRITE";
- }
- ops += " ]";
- return key.channel() + " " + ops;
+ current._leader = true; // The current thread has become the leader.
+ _promote = false;
+ return false;
}
- private Instance _instance;
+ private final Instance _instance;
+ private final ThreadPoolWorkQueue _workQueue;
private boolean _destroyed;
private final String _prefix;
- private final String _programNamePrefix;
+ private final String _threadPrefix;
private final Selector _selector;
- private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
- private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>();
- private int _timeout;
private final class EventHandlerThread implements Runnable
{
@@ -992,28 +598,18 @@ public final class ThreadPool
_name = name;
}
- public boolean
- isAlive()
- {
- return _thread.isAlive();
- }
-
- public boolean
- join(boolean wait)
+ public void
+ join()
{
while(true)
{
try
{
_thread.join();
- return true;
+ break;
}
catch(InterruptedException ex)
{
- if(!wait)
- {
- return false;
- }
}
}
}
@@ -1031,73 +627,65 @@ public final class ThreadPool
{
if(_instance.initializationData().threadHook != null)
{
- _instance.initializationData().threadHook.start();
+ try
+ {
+ _instance.initializationData().threadHook.start();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook start() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
}
- BasicStream stream = new BasicStream(_instance);
-
- boolean promote;
-
try
{
- promote = ThreadPool.this.run(stream);
+ ThreadPool.this.run(this);
}
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + sw.toString();
+ String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
- promote = true;
}
- if(promote && _sizeMax > 1)
+ if(_instance.initializationData().threadHook != null)
{
- //
- // Promote a follower, but w/o modifying _inUse or
- // creating new threads.
- //
- synchronized(ThreadPool.this)
+ try
{
- assert(!_promote);
- _promote = true;
- ThreadPool.this.notify();
+ _instance.initializationData().threadHook.stop();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook stop() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
}
- }
-
- if(TRACE_THREAD)
- {
- trace("run() terminated");
- }
-
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.stop();
}
}
- private String _name;
+ final private String _name;
private Thread _thread;
}
private final int _size; // Number of threads that are pre-created.
+ private final int _sizeIO; // Number of threads that can concurrently perform IO.
private final int _sizeMax; // Maximum number of threads.
private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
private final boolean _serialize; // True if requests need to be serialized over the connection.
-
+ private final int _priority;
+ private final boolean _hasPriority;
+ private final long _serverIdleTime;
+ private final long _threadIdleTime;
private final int _stackSize;
- private java.util.List<EventHandlerThread> _threads; // All threads, running or not.
+ private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>();
private int _threadIndex; // For assigning thread names.
- private int _running; // Number of running threads.
private int _inUse; // Number of threads that are currently in use.
- private double _load; // Current load in number of threads.
+ private int _inUseIO; // Number of threads that are currently performing IO.
- private boolean _promote;
+ private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>();
+ private java.util.Iterator<EventHandler> _nextHandler;
- private final boolean _warnUdp;
- private int _priority;
- private boolean _hasPriority = false;
+ private boolean _promote;
}