summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-10-20 11:40:05 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-10-20 11:40:05 -0230
commitb51469b41167fb86ae2059a15cf0475c53fdda7b (patch)
treefc85d6ca2efd89c67e1e4e7438f437c3e08313f4 /java/src/IceInternal/ThreadPool.java
parentFixed (ICE-5695) - IceSSL: misleading exception (diff)
downloadice-b51469b41167fb86ae2059a15cf0475c53fdda7b.tar.bz2
ice-b51469b41167fb86ae2059a15cf0475c53fdda7b.tar.xz
ice-b51469b41167fb86ae2059a15cf0475c53fdda7b.zip
Down with ant. From the gradle to the grave.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java900
1 files changed, 0 insertions, 900 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
deleted file mode 100644
index c21305d2395..00000000000
--- a/java/src/IceInternal/ThreadPool.java
+++ /dev/null
@@ -1,900 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public final class ThreadPool
-{
- final class ShutdownWorkItem implements ThreadPoolWorkItem
- {
- @Override
- public void execute(ThreadPoolCurrent current)
- {
- current.ioCompleted();
- try
- {
- _instance.objectAdapterFactory().shutdown();
- }
- catch(Ice.CommunicatorDestroyedException ex)
- {
- }
- }
- }
-
- static final class FinishedWorkItem implements ThreadPoolWorkItem
- {
- public
- FinishedWorkItem(EventHandler handler, boolean close)
- {
- _handler = handler;
- _close = close;
- }
-
- @Override
- public void execute(ThreadPoolCurrent current)
- {
- _handler.finished(current, _close);
- }
-
- private final EventHandler _handler;
- private final boolean _close;
- }
-
- static final class JoinThreadWorkItem implements ThreadPoolWorkItem
- {
- public
- JoinThreadWorkItem(EventHandlerThread thread)
- {
- _thread = thread;
- }
-
- @Override
- public void execute(ThreadPoolCurrent current)
- {
- // No call to ioCompleted, this shouldn't block (and we don't want to cause
- // a new thread to be started).
- try
- {
- _thread.join();
- }
- catch (InterruptedException e)
- {
- // Ignore.
- }
- }
-
- private final EventHandlerThread _thread;
- }
-
- static final class InterruptWorkItem implements ThreadPoolWorkItem
- {
- @Override
- public void execute(ThreadPoolCurrent current)
- {
- // Nothing to do, this is just used to interrupt the thread pool selector.
- }
- }
-
- private static ThreadPoolWorkItem _interruptWorkItem = new InterruptWorkItem();
-
- //
- // Exception raised by the thread pool work queue when the thread pool
- // is destroyed.
- //
- static final class DestroyedException extends RuntimeException
- {
- }
-
- public
- ThreadPool(Instance instance, String prefix, int timeout)
- {
- _instance = instance;
- _dispatcher = instance.initializationData().dispatcher;
- _destroyed = false;
- _prefix = prefix;
- _selector = new Selector(instance);
- _threadIndex = 0;
- _inUse = 0;
- _inUseIO = 0;
- _promote = true;
- _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
- _serverIdleTime = timeout;
-
- Ice.Properties properties = _instance.initializationData().properties;
-
- String programName = properties.getProperty("Ice.ProgramName");
- if(programName.length() > 0)
- {
- _threadPrefix = programName + "-" + _prefix;
- }
- else
- {
- _threadPrefix = _prefix;
- }
-
- int nProcessors = Runtime.getRuntime().availableProcessors();
-
- //
- // We use just one thread as the default. This is the fastest
- // possible setting, still allows one level of nesting, and
- // doesn't require to make the servants thread safe.
- //
- int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
- if(size < 1)
- {
- String s = _prefix + ".Size < 1; Size adjusted to 1";
- _instance.initializationData().logger.warning(s);
- size = 1;
- }
-
- int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
- if(sizeMax == -1)
- {
- sizeMax = nProcessors;
- }
- if(sizeMax < size)
- {
- String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
- _instance.initializationData().logger.warning(s);
- sizeMax = size;
- }
-
- int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn");
- if(sizeWarn != 0 && sizeWarn < size)
- {
- String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
- _instance.initializationData().logger.warning(s);
- sizeWarn = size;
- }
- else if(sizeWarn > sizeMax)
- {
- String s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" + sizeMax + ")";
- _instance.initializationData().logger.warning(s);
- sizeWarn = sizeMax;
- }
-
- int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
- if(threadIdleTime < 0)
- {
- String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
- _instance.initializationData().logger.warning(s);
- threadIdleTime = 0;
- }
-
- _size = size;
- _sizeMax = sizeMax;
- _sizeWarn = sizeWarn;
- _sizeIO = Math.min(sizeMax, nProcessors);
- _threadIdleTime = threadIdleTime;
-
- int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize");
- if(stackSize < 0)
- {
- String s = _prefix + ".StackSize < 0; Size adjusted to JRE default";
- _instance.initializationData().logger.warning(s);
- stackSize = 0;
- }
- _stackSize = stackSize;
-
- boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority").length() > 0;
- int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority");
- if(!hasPriority)
- {
- hasPriority = properties.getProperty("Ice.ThreadPriority").length() > 0;
- priority = properties.getPropertyAsInt("Ice.ThreadPriority");
- }
- _hasPriority = hasPriority;
- _priority = priority;
-
- _workQueue = new ThreadPoolWorkQueue(_instance, this, _selector);
- _nextHandler = _handlers.iterator();
-
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
- _sizeWarn;
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
- {
- for(int i = 0; i < _size; i++)
- {
- EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
- if(_hasPriority)
- {
- thread.start(_priority);
- }
- else
- {
- thread.start(java.lang.Thread.NORM_PRIORITY);
- }
- _threads.add(thread);
- }
- }
- catch(RuntimeException ex)
- {
- String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
-
- destroy();
- try
- {
- joinWithAllThreads();
- }
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- throw ex;
- }
- }
-
- @Override
- protected synchronized void
- finalize()
- throws Throwable
- {
- try
- {
- IceUtilInternal.Assert.FinalizerAssert(_destroyed);
- }
- catch(java.lang.Exception ex)
- {
- }
- finally
- {
- super.finalize();
- }
- }
-
- public synchronized void
- destroy()
- {
- if(_destroyed)
- {
- return;
- }
-
- _destroyed = true;
- _workQueue.destroy();
- }
-
- public synchronized void
- updateObservers()
- {
- for(EventHandlerThread thread : _threads)
- {
- thread.updateObserver();
- }
- }
-
- public synchronized void
- initialize(EventHandler handler)
- {
- assert(!_destroyed);
- _selector.initialize(handler);
- }
-
- public void
- register(EventHandler handler, int op)
- {
- update(handler, SocketOperation.None, op);
- }
-
- public synchronized void
- update(EventHandler handler, int remove, int add)
- {
- assert(!_destroyed);
-
- // Don't remove what needs to be added
- remove &= ~add;
-
- // Don't remove/add if already un-registered or registered
- remove = handler._registered & remove;
- add = ~handler._registered & add;
- if(remove == add)
- {
- return;
- }
- _selector.update(handler, remove, add);
-
- if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value &&
- (handler._disabled & SocketOperation.Read) == 0)
- {
- if(_pendingHandlers.isEmpty())
- {
- _workQueue.queue(_interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.add(handler);
- }
- else if((remove & SocketOperation.Read) != 0)
- {
- _pendingHandlers.remove(handler);
- }
- }
-
- public void
- unregister(EventHandler handler, int op)
- {
- update(handler, op, SocketOperation.None);
- }
-
- public synchronized boolean
- finish(EventHandler handler, boolean closeNow)
- {
- assert(!_destroyed);
- closeNow = _selector.finish(handler, closeNow);
- _workQueue.queue(new FinishedWorkItem(handler, !closeNow));
- return closeNow;
- }
-
- public void
- dispatchFromThisThread(DispatchWorkItem workItem)
- {
- if(_dispatcher != null)
- {
- try
- {
- _dispatcher.dispatch(workItem, workItem.getConnection());
- }
- catch(java.lang.Exception ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- _instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString());
- }
- }
- }
- else
- {
- workItem.run();
- }
- }
-
- public void
- dispatch(DispatchWorkItem workItem)
- {
- _workQueue.queue(workItem);
- }
-
- public void
- joinWithAllThreads()
- throws InterruptedException
- {
- //
- // _threads is immutable after destroy() has been called,
- // therefore no synchronization is needed. (Synchronization
- // wouldn't be possible here anyway, because otherwise the
- // other threads would never terminate.)
- //
- for(EventHandlerThread thread : _threads)
- {
- thread.join();
- }
-
- //
- // Destroy the selector
- //
- _selector.destroy();
- }
-
- private void
- run(EventHandlerThread thread)
- {
- ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread);
- boolean select = false;
- java.util.List<EventHandlerOpPair> handlers = new java.util.ArrayList<EventHandlerOpPair>();
-
- while(true)
- {
- if(current._handler != null)
- {
- try
- {
- current._handler.message(current);
- }
- catch(DestroyedException ex)
- {
- return;
- }
- catch(java.lang.Exception ex)
- {
- String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex);
- s += "\nevent handler: " + current._handler.toString();
- _instance.initializationData().logger.error(s);
- }
- }
- else if(select)
- {
- if(_workQueue.size() == 0)
- {
- try
- {
- _selector.select(handlers, _serverIdleTime);
- }
- catch(Selector.TimeoutException ex)
- {
- synchronized(this)
- {
- if(!_destroyed && _inUse == 0)
- {
- _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
- }
- continue;
- }
- }
- }
- _workQueue.update(handlers);
- }
-
- synchronized(this)
- {
- if(current._handler == null)
- {
- if(select)
- {
- java.util.List<EventHandlerOpPair> tmp = _handlers;
- _handlers = handlers;
- handlers = tmp;
-
- if(!_pendingHandlers.isEmpty())
- {
- for(EventHandlerOpPair pair : _handlers)
- {
- _pendingHandlers.remove(pair.handler);
- }
- for(EventHandler p : _pendingHandlers)
- {
- _handlers.add(new EventHandlerOpPair(p, SocketOperation.Read));
- }
- _pendingHandlers.clear();
- }
-
- _nextHandler = _handlers.iterator();
- _selector.finishSelect();
- select = false;
- }
- else if(!current._leader && followerWait(current))
- {
- return; // Wait timed-out.
- }
- }
- else if(_sizeMax > 1)
- {
- if(!current._ioCompleted)
- {
- //
- // The handler didn't call ioCompleted() so we take care of decreasing
- // the IO thread count now.
- //
- --_inUseIO;
-
- if(current._handler._hasMoreData.value &&
- (current._handler._registered & SocketOperation.Read) != 0)
- {
- if(_pendingHandlers.isEmpty())
- {
- _workQueue.queue(_interruptWorkItem);
- }
- _pendingHandlers.add(current._handler);
- }
- }
- else
- {
- //
- // If the handler called ioCompleted(), we re-enable the handler in
- // case it was disabled and we decrease the number of thread in use.
- //
- if(_serialize)
- {
- _selector.enable(current._handler, current.operation);
- if(current._handler._hasMoreData.value &&
- (current._handler._registered & SocketOperation.Read) != 0)
- {
- if(_pendingHandlers.isEmpty())
- {
- _workQueue.queue(_interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.add(current._handler);
- }
- }
- assert(_inUse > 0);
- --_inUse;
- }
-
- if(!current._leader && followerWait(current))
- {
- return; // Wait timed-out.
- }
- }
- else if(current._handler._hasMoreData.value &&
- (current._handler._registered & SocketOperation.Read) != 0)
- {
- if(_pendingHandlers.isEmpty())
- {
- _workQueue.queue(_interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.add(current._handler);
- }
-
- //
- // Get the next ready handler.
- //
- EventHandlerOpPair next = null;
- while(_nextHandler.hasNext())
- {
- EventHandlerOpPair n = _nextHandler.next();
- if((n.op & n.handler._registered) != 0)
- {
- next = n;
- break;
- }
- }
- if(next != null)
- {
- current._ioCompleted = false;
- current._handler = next.handler;
- current.operation = next.op;
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
- }
- else
- {
- current._handler = null;
- }
-
- if(current._handler == null)
- {
- //
- // If there are no more ready handlers and there are still threads busy performing
- // IO, we give up leadership and promote another follower (which will perform the
- // select() only once all the IOs are completed). Otherwise, if there's no more
- // threads peforming IOs, it's time to do another select().
- //
- if(_inUseIO > 0)
- {
- promoteFollower(current);
- }
- else
- {
- _handlers.clear();
- _selector.startSelect();
- select = true;
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
- }
- }
- else if(_sizeMax > 1)
- {
- //
- // Increment the IO thread count and if there's still threads available
- // to perform IO and more handlers ready, we promote a follower.
- //
- ++_inUseIO;
- if(_nextHandler.hasNext() && _inUseIO < _sizeIO)
- {
- promoteFollower(current);
- }
- }
- }
- }
- }
-
- synchronized void
- ioCompleted(ThreadPoolCurrent current)
- {
- current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
-
- current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
-
- if(_sizeMax > 1)
- {
- --_inUseIO;
-
- if(!_destroyed)
- {
- if(_serialize)
- {
- _selector.disable(current._handler, current.operation);
-
- // Make sure the handler isn't in the set of pending handlers (this can
- // for example occur if the handler is has more data and its added by
- // ThreadPool::update while we were processing IO).
- _pendingHandlers.remove(current._handler);
- }
- else if(current._handler._hasMoreData.value &&
- (current._handler._registered & SocketOperation.Read) != 0)
- {
- if(_pendingHandlers.isEmpty())
- {
- _workQueue.queue(_interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.add(current._handler);
- }
- }
-
- if(current._leader)
- {
- //
- // If this thread is still the leader, it's time to promote a new leader.
- //
- promoteFollower(current);
- }
- else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
- {
- notify();
- }
-
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.initializationData().logger.warning(s);
- }
-
- if(!_destroyed)
- {
- assert(_inUse <= _threads.size());
- if(_inUse < _sizeMax && _inUse == _threads.size())
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
- {
- EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
- _threads.add(thread);
- if(_hasPriority)
- {
- thread.start(_priority);
- }
- else
- {
- thread.start(java.lang.Thread.NORM_PRIORITY);
- }
- }
- catch(RuntimeException ex)
- {
- String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
- }
-
- private synchronized void
- promoteFollower(ThreadPoolCurrent current)
- {
- assert(!_promote && current._leader);
- _promote = true;
- if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0))
- {
- notify();
- }
- current._leader = false;
- }
-
- private synchronized boolean
- followerWait(ThreadPoolCurrent current)
- {
- assert(!current._leader);
-
- current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
-
- //
- // It's important to clear the handler before waiting to make sure that
- // resources for the handler are released now if it's finished. We also
- // clear the per-thread stream.
- //
- current._handler = null;
- current.stream.reset();
-
- //
- // Wait to be promoted and for all the IO threads to be done.
- //
- while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0))
- {
- if(_threadIdleTime > 0)
- {
- long before = IceInternal.Time.currentMonotonicTimeMillis();
- boolean interrupted = false;
- try
- {
- //
- // If the wait is interrupted then we'll let the thread die as if it timed out.
- //
- wait(_threadIdleTime * 1000);
- }
- catch (InterruptedException e)
- {
- interrupted = true;
- }
- if(interrupted || IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
- {
- if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
- (!_nextHandler.hasNext() && _inUseIO > 0)))
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
- assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
- _threads.remove(current._thread);
- _workQueue.queue(new JoinThreadWorkItem(current._thread));
- return true;
- }
- }
- }
- else
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- //
- // Eat the InterruptedException.
- //
- }
- }
- }
- current._leader = true; // The current thread has become the leader.
- _promote = false;
- return false;
- }
-
- private final Instance _instance;
- private final Ice.Dispatcher _dispatcher;
- private final ThreadPoolWorkQueue _workQueue;
- private boolean _destroyed;
- private final String _prefix;
- private final String _threadPrefix;
- private final Selector _selector;
-
- final class EventHandlerThread implements Runnable
- {
- EventHandlerThread(String name)
- {
- _name = name;
- _state = Ice.Instrumentation.ThreadState.ThreadStateIdle;
- updateObserver();
- }
-
- public void
- updateObserver()
- {
- // Must be called with the thread pool mutex locked
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
- if(obsv != null)
- {
- _observer = obsv.getThreadObserver(_prefix, _name, _state, _observer);
- if(_observer != null)
- {
- _observer.attach();
- }
- }
- }
-
- public void
- setState(Ice.Instrumentation.ThreadState s)
- {
- // Must be called with the thread pool mutex locked
- if(_observer != null)
- {
- if(_state != s)
- {
- _observer.stateChanged(_state, s);
- }
- }
- _state = s;
- }
-
- public void
- join()
- throws InterruptedException
- {
- _thread.join();
- }
-
- public void
- start(int priority)
- {
- _thread = new Thread(null, this, _name, _stackSize);
- _thread.setPriority(priority);
- _thread.start();
- }
-
- @Override
- public void
- run()
- {
- if(_instance.initializationData().threadHook != null)
- {
- try
- {
- _instance.initializationData().threadHook.start();
- }
- catch(java.lang.Exception ex)
- {
- String s = "thread hook start() method raised an unexpected exception in `";
- s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
- }
- }
-
- try
- {
- ThreadPool.this.run(this);
- }
- catch(java.lang.Exception ex)
- {
- String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
- }
-
- if(_observer != null)
- {
- _observer.detach();
- }
-
- if(_instance.initializationData().threadHook != null)
- {
- try
- {
- _instance.initializationData().threadHook.stop();
- }
- catch(java.lang.Exception ex)
- {
- String s = "thread hook stop() method raised an unexpected exception in `";
- s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
- }
- }
- }
-
- final private String _name;
- private Thread _thread;
- private Ice.Instrumentation.ThreadState _state;
- private Ice.Instrumentation.ThreadObserver _observer;
- }
-
- private final int _size; // Number of threads that are pre-created.
- private final int _sizeIO; // Number of threads that can concurrently perform IO.
- private final int _sizeMax; // Maximum number of threads.
- private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
- private final boolean _serialize; // True if requests need to be serialized over the connection.
- private final int _priority;
- private final boolean _hasPriority;
- private final long _serverIdleTime;
- private final long _threadIdleTime;
- private final int _stackSize;
-
- private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>();
- private int _threadIndex; // For assigning thread names.
- private int _inUse; // Number of threads that are currently in use.
- private int _inUseIO; // Number of threads that are currently performing IO.
-
- private java.util.List<EventHandlerOpPair> _handlers = new java.util.ArrayList<EventHandlerOpPair>();
- private java.util.Iterator<EventHandlerOpPair> _nextHandler;
- private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>();
-
- private boolean _promote;
-}