summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java690
1 files changed, 690 insertions, 0 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
new file mode 100644
index 00000000000..30cbf463e31
--- /dev/null
+++ b/java/src/IceInternal/ThreadPool.java
@@ -0,0 +1,690 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public final class ThreadPool
+{
+ final class ShutdownWorkItem implements ThreadPoolWorkItem
+ {
+ public void execute(ThreadPoolCurrent current)
+ {
+ current.ioCompleted();
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException ex)
+ {
+ }
+ }
+ }
+
+ static final class FinishedWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ FinishedWorkItem(EventHandler handler)
+ {
+ _handler = handler;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ _handler.finished(current);
+ }
+
+ private final EventHandler _handler;
+ }
+
+ static final class JoinThreadWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ JoinThreadWorkItem(EventHandlerThread thread)
+ {
+ _thread = thread;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
+ _thread.join();
+ }
+
+ private final EventHandlerThread _thread;
+ }
+
+ //
+ // Exception raised by the thread pool work queue when the thread pool
+ // is destroyed.
+ //
+ static final class DestroyedException extends RuntimeException
+ {
+ }
+
+ public
+ ThreadPool(Instance instance, String prefix, int timeout)
+ {
+ _instance = instance;
+ _destroyed = false;
+ _prefix = prefix;
+ _selector = new Selector(instance);
+ _threadIndex = 0;
+ _inUse = 0;
+ _inUseIO = 0;
+ _promote = true;
+ _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
+ _serverIdleTime = timeout;
+
+ Ice.Properties properties = _instance.initializationData().properties;
+
+ String programName = properties.getProperty("Ice.ProgramName");
+ if(programName.length() > 0)
+ {
+ _threadPrefix = programName + "-" + _prefix;
+ }
+ else
+ {
+ _threadPrefix = _prefix;
+ }
+
+ int nProcessors = Runtime.getRuntime().availableProcessors();
+
+ //
+ // We use just one thread as the default. This is the fastest
+ // possible setting, still allows one level of nesting, and
+ // doesn't require to make the servants thread safe.
+ //
+ int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ if(size < 1)
+ {
+ String s = _prefix + ".Size < 1; Size adjusted to 1";
+ _instance.initializationData().logger.warning(s);
+ size = 1;
+ }
+
+ int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax == -1)
+ {
+ sizeMax = nProcessors;
+ }
+ if(sizeMax < size)
+ {
+ String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeMax = size;
+ }
+
+ int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn");
+ if(sizeWarn != 0 && sizeWarn < size)
+ {
+ String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeWarn = size;
+ }
+ else if(sizeWarn > sizeMax)
+ {
+ String s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" + sizeMax + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeWarn = sizeMax;
+ }
+
+ int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ _instance.initializationData().logger.warning(s);
+ threadIdleTime = 0;
+ }
+
+ _size = size;
+ _sizeMax = sizeMax;
+ _sizeWarn = sizeWarn;
+ _sizeIO = Math.min(sizeMax, nProcessors);
+ _threadIdleTime = threadIdleTime;
+
+ int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize");
+ if(stackSize < 0)
+ {
+ String s = _prefix + ".StackSize < 0; Size adjusted to JRE default";
+ _instance.initializationData().logger.warning(s);
+ stackSize = 0;
+ }
+ _stackSize = stackSize;
+
+ boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority").length() > 0;
+ int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority");
+ if(!hasPriority)
+ {
+ hasPriority = properties.getProperty("Ice.ThreadPriority").length() > 0;
+ priority = properties.getPropertyAsInt("Ice.ThreadPriority");
+ }
+ _hasPriority = hasPriority;
+ _priority = priority;
+
+ _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+
+ _nextHandler = _handlers.iterator();
+
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
+ _sizeWarn;
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ try
+ {
+ for(int i = 0; i < _size; i++)
+ {
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
+ _threads.add(thread);
+ if(_hasPriority)
+ {
+ thread.start(_priority);
+ }
+ else
+ {
+ thread.start(java.lang.Thread.NORM_PRIORITY);
+ }
+ }
+ }
+ catch(RuntimeException ex)
+ {
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+
+ destroy();
+ joinWithAllThreads();
+ throw ex;
+ }
+ }
+
+ protected synchronized void
+ finalize()
+ throws Throwable
+ {
+ IceUtilInternal.Assert.FinalizerAssert(_destroyed);
+ }
+
+ public synchronized void
+ destroy()
+ {
+ assert(!_destroyed);
+ _destroyed = true;
+ _workQueue.destroy();
+ }
+
+ public synchronized void
+ initialize(EventHandler handler)
+ {
+ assert(!_destroyed);
+ _selector.initialize(handler);
+ }
+
+ public void
+ register(EventHandler handler, int op)
+ {
+ update(handler, SocketOperation.None, op);
+ }
+
+ public synchronized void
+ update(EventHandler handler, int remove, int add)
+ {
+ assert(!_destroyed);
+ _selector.update(handler, remove, add);
+ }
+
+ public void
+ unregister(EventHandler handler, int op)
+ {
+ update(handler, op, SocketOperation.None);
+ }
+
+ public synchronized void
+ finish(EventHandler handler)
+ {
+ assert(!_destroyed);
+ _selector.finish(handler);
+ _workQueue.queue(new FinishedWorkItem(handler));
+ }
+
+ public void
+ execute(ThreadPoolWorkItem workItem)
+ {
+ _workQueue.queue(workItem);
+ }
+
+ public void
+ joinWithAllThreads()
+ {
+ //
+ // _threads is immutable after destroy() has been called,
+ // therefore no synchronization is needed. (Synchronization
+ // wouldn't be possible here anyway, because otherwise the
+ // other threads would never terminate.)
+ //
+ for(EventHandlerThread thread : _threads)
+ {
+ thread.join();
+ }
+
+ //
+ // Destroy the selector
+ //
+ _workQueue.close();
+ _selector.destroy();
+ }
+
+ public String
+ prefix()
+ {
+ return _prefix;
+ }
+
+ private void
+ run(EventHandlerThread thread)
+ {
+ ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this);
+ boolean select = false;
+ while(true)
+ {
+ if(current._handler != null)
+ {
+ try
+ {
+ current._handler.message(current);
+ }
+ catch(DestroyedException ex)
+ {
+ return;
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex);
+ s += "\nevent handler: " + current._handler.toString();
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ else if(select)
+ {
+ try
+ {
+ _selector.select(_serverIdleTime);
+ }
+ catch(Selector.TimeoutException ex)
+ {
+ synchronized(this)
+ {
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
+ }
+ continue;
+ }
+ }
+ }
+
+ synchronized(this)
+ {
+ if(current._handler == null)
+ {
+ if(select)
+ {
+ _selector.finishSelect(_handlers, _serverIdleTime);
+ _nextHandler = _handlers.iterator();
+ select = false;
+ }
+ else if(!current._leader && followerWait(thread, current))
+ {
+ return; // Wait timed-out.
+ }
+ }
+ else if(_sizeMax > 1)
+ {
+ if(!current._ioCompleted)
+ {
+ //
+ // The handler didn't call ioCompleted() so we take care of decreasing
+ // the IO thread count now.
+ //
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ _selector.hasMoreData(current._handler);
+ }
+ }
+ else
+ {
+ //
+ // If the handler called ioCompleted(), we re-enable the handler in
+ // case it was disabled and we decrease the number of thread in use.
+ //
+ _selector.enable(current._handler, current.operation);
+ assert(_inUse > 0);
+ --_inUse;
+ }
+
+ if(!current._leader && followerWait(thread, current))
+ {
+ return; // Wait timed-out.
+ }
+ }
+ else if(!current._ioCompleted &&
+ (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ _selector.hasMoreData(current._handler);
+ }
+
+ //
+ // Get the next ready handler.
+ //
+ if(_nextHandler.hasNext())
+ {
+ current._ioCompleted = false;
+ current._handler = _nextHandler.next();
+ current.operation = current._handler._ready;
+ }
+ else
+ {
+ current._handler = null;
+ }
+
+ if(current._handler == null)
+ {
+ //
+ // If there are no more ready handlers and there are still threads busy performing
+ // IO, we give up leadership and promote another follower (which will perform the
+ // select() only once all the IOs are completed). Otherwise, if there's no more
+ // threads peforming IOs, it's time to do another select().
+ //
+ if(_inUseIO > 0)
+ {
+ promoteFollower(current);
+ }
+ else
+ {
+ _selector.startSelect();
+ select = true;
+ }
+ }
+ else if(_sizeMax > 1)
+ {
+ //
+ // Increment the IO thread count and if there's still threads available
+ // to perform IO and more handlers ready, we promote a follower.
+ //
+ ++_inUseIO;
+ if(_nextHandler.hasNext() && _inUseIO < _sizeIO)
+ {
+ promoteFollower(current);
+ }
+ }
+ }
+ }
+ }
+
+ void
+ ioCompleted(ThreadPoolCurrent current)
+ {
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
+
+ if(_sizeMax > 1)
+ {
+ synchronized(this)
+ {
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ _selector.hasMoreData(current._handler);
+ }
+
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler, current.operation);
+ }
+
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
+ {
+ notify();
+ }
+
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
+
+ if(!_destroyed)
+ {
+ assert(_inUse <= _threads.size());
+ if(_inUse < _sizeMax && _inUse == _threads.size())
+ {
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
+ _threads.add(thread);
+ if(_hasPriority)
+ {
+ thread.start(_priority);
+ }
+ else
+ {
+ thread.start(java.lang.Thread.NORM_PRIORITY);
+ }
+ }
+ catch(RuntimeException ex)
+ {
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ }
+ }
+ }
+ else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ synchronized(this)
+ {
+ _selector.hasMoreData(current._handler);
+ }
+ }
+ }
+
+ private synchronized void
+ promoteFollower(ThreadPoolCurrent current)
+ {
+ assert(!_promote && current._leader);
+ _promote = true;
+ if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0))
+ {
+ notify();
+ }
+ current._leader = false;
+ }
+
+ private synchronized boolean
+ followerWait(EventHandlerThread thread, ThreadPoolCurrent current)
+ {
+ assert(!current._leader);
+
+ //
+ // It's important to clear the handler before waiting to make sure that
+ // resources for the handler are released now if it's finished. We also
+ // clear the per-thread stream.
+ //
+ current._handler = null;
+ current.stream.reset();
+
+ //
+ // Wait to be promoted and for all the IO threads to be done.
+ //
+ while(!_promote || _inUseIO == _sizeIO || !_nextHandler.hasNext() && _inUseIO > 0)
+ {
+ try
+ {
+ if(_threadIdleTime > 0)
+ {
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ wait(_threadIdleTime * 1000);
+ if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
+ {
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
+ (!_nextHandler.hasNext() && _inUseIO > 0)))
+ {
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.remove(thread);
+ _workQueue.queue(new JoinThreadWorkItem(thread));
+ return true;
+ }
+ }
+ }
+ else
+ {
+ wait();
+ }
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ current._leader = true; // The current thread has become the leader.
+ _promote = false;
+ return false;
+ }
+
+ private final Instance _instance;
+ private final ThreadPoolWorkQueue _workQueue;
+ private boolean _destroyed;
+ private final String _prefix;
+ private final String _threadPrefix;
+ private final Selector _selector;
+
+ private final class EventHandlerThread implements Runnable
+ {
+ EventHandlerThread(String name)
+ {
+ _name = name;
+ }
+
+ public void
+ join()
+ {
+ while(true)
+ {
+ try
+ {
+ _thread.join();
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ }
+
+ public void
+ start(int priority)
+ {
+ _thread = new Thread(null, this, _name, _stackSize);
+ _thread.setPriority(priority);
+ _thread.start();
+ }
+
+ public void
+ run()
+ {
+ if(_instance.initializationData().threadHook != null)
+ {
+ try
+ {
+ _instance.initializationData().threadHook.start();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook start() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
+ }
+
+ try
+ {
+ ThreadPool.this.run(this);
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
+
+ if(_instance.initializationData().threadHook != null)
+ {
+ try
+ {
+ _instance.initializationData().threadHook.stop();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook stop() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ }
+
+ final private String _name;
+ private Thread _thread;
+ }
+
+ private final int _size; // Number of threads that are pre-created.
+ private final int _sizeIO; // Number of threads that can concurrently perform IO.
+ private final int _sizeMax; // Maximum number of threads.
+ private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+ private final boolean _serialize; // True if requests need to be serialized over the connection.
+ private final int _priority;
+ private final boolean _hasPriority;
+ private final long _serverIdleTime;
+ private final long _threadIdleTime;
+ private final int _stackSize;
+
+ private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>();
+ private int _threadIndex; // For assigning thread names.
+ private int _inUse; // Number of threads that are currently in use.
+ private int _inUseIO; // Number of threads that are currently performing IO.
+
+ private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>();
+ private java.util.Iterator<EventHandler> _nextHandler;
+
+ private boolean _promote;
+}