summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/ThreadPool.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/ThreadPool.cs')
-rw-r--r--csharp/src/Ice/ThreadPool.cs876
1 files changed, 876 insertions, 0 deletions
diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs
new file mode 100644
index 00000000000..1690d8ccb5c
--- /dev/null
+++ b/csharp/src/Ice/ThreadPool.cs
@@ -0,0 +1,876 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+namespace IceInternal
+{
+
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Threading;
+
+ public delegate void ThreadPoolWorkItem();
+ public delegate void AsyncCallback(object state);
+
+ internal struct ThreadPoolMessage
+ {
+ public ThreadPoolMessage(object mutex)
+ {
+ _mutex = mutex;
+ _finish = false;
+ _finishWithIO = false;
+ }
+
+ public bool startIOScope(ref ThreadPoolCurrent current)
+ {
+ // This must be called with the handler locked.
+ _finishWithIO = current.startMessage();
+ return _finishWithIO;
+ }
+
+ public void finishIOScope(ref ThreadPoolCurrent current)
+ {
+ if(_finishWithIO)
+ {
+ lock(_mutex)
+ {
+ current.finishMessage(true);
+ }
+ }
+ }
+
+ public void completed(ref ThreadPoolCurrent current)
+ {
+ //
+ // Call finishMessage once IO is completed only if serialization is not enabled.
+ // Otherwise, finishMessage will be called when the event handler is done with
+ // the message (it will be called from destroy below).
+ //
+ Debug.Assert(_finishWithIO);
+ if(current.ioCompleted())
+ {
+ _finishWithIO = false;
+ _finish = true;
+ }
+ }
+
+ public void destroy(ref ThreadPoolCurrent current)
+ {
+ if(_finish)
+ {
+ //
+ // A ThreadPoolMessage instance must be created outside the synchronization
+ // of the event handler. We need to lock the event handler here to call
+ // finishMessage.
+ //
+ lock(_mutex)
+ {
+ current.finishMessage(false);
+ Debug.Assert(!current.completedSynchronously);
+ }
+ }
+ }
+
+ private object _mutex;
+ private bool _finish;
+ private bool _finishWithIO;
+ }
+
+ public struct ThreadPoolCurrent
+ {
+ public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op)
+ {
+ _threadPool = threadPool;
+ _handler = handler;
+ operation = op;
+ completedSynchronously = false;
+ }
+
+ public readonly int operation;
+ public bool completedSynchronously;
+
+ public bool ioCompleted()
+ {
+ return _threadPool.serialize();
+ }
+
+ public bool startMessage()
+ {
+ return _threadPool.startMessage(ref this);
+ }
+
+ public void finishMessage(bool fromIOThread)
+ {
+ _threadPool.finishMessage(ref this, fromIOThread);
+ }
+
+ internal readonly ThreadPool _threadPool;
+ internal readonly EventHandler _handler;
+ }
+
+ public sealed class ThreadPool
+ {
+ public ThreadPool(Instance instance, string prefix, int timeout)
+ {
+ Ice.Properties properties = instance.initializationData().properties;
+
+ _instance = instance;
+ _dispatcher = instance.initializationData().dispatcher;
+ _destroyed = false;
+ _prefix = prefix;
+ _threadIndex = 0;
+ _inUse = 0;
+ _serialize = properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
+ _serverIdleTime = timeout;
+
+ string programName = properties.getProperty("Ice.ProgramName");
+ if(programName.Length > 0)
+ {
+ _threadPrefix = programName + "-" + _prefix;
+ }
+ else
+ {
+ _threadPrefix = _prefix;
+ }
+
+ //
+ // We use just one thread as the default. This is the fastest
+ // possible setting, still allows one level of nesting, and
+ // doesn't require to make the servants thread safe.
+ //
+ int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ if(size < 1)
+ {
+ string s = _prefix + ".Size < 1; Size adjusted to 1";
+ _instance.initializationData().logger.warning(s);
+ size = 1;
+ }
+
+ int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax < size)
+ {
+ string s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeMax = size;
+ }
+
+ int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn");
+ if(sizeWarn != 0 && sizeWarn < size)
+ {
+ string s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeWarn = size;
+ }
+ else if(sizeWarn > sizeMax)
+ {
+ string s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax ("
+ + sizeMax + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeWarn = sizeMax;
+ }
+
+ int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ string s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ _instance.initializationData().logger.warning(s);
+ threadIdleTime = 0;
+ }
+
+ _size = size;
+ _sizeMax = sizeMax;
+ _sizeWarn = sizeWarn;
+ _threadIdleTime = threadIdleTime;
+
+ int stackSize = properties.getPropertyAsInt(_prefix + ".StackSize");
+ if(stackSize < 0)
+ {
+ string s = _prefix + ".StackSize < 0; Size adjusted to OS default";
+ _instance.initializationData().logger.warning(s);
+ stackSize = 0;
+ }
+ _stackSize = stackSize;
+
+#if !SILVERLIGHT
+ _hasPriority = properties.getProperty(_prefix + ".ThreadPriority").Length > 0;
+ _priority = IceInternal.Util.stringToThreadPriority(properties.getProperty(_prefix + ".ThreadPriority"));
+ if(!_hasPriority)
+ {
+ _hasPriority = properties.getProperty("Ice.ThreadPriority").Length > 0;
+ _priority = IceInternal.Util.stringToThreadPriority(properties.getProperty("Ice.ThreadPriority"));
+ }
+#endif
+
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ string s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
+ _sizeWarn;
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ _workItems = new Queue<ThreadPoolWorkItem>();
+
+ try
+ {
+ _threads = new List<WorkerThread>();
+ for(int i = 0; i < _size; ++i)
+ {
+ WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+#if !SILVERLIGHT
+ if(_hasPriority)
+ {
+ thread.start(_priority);
+ }
+ else
+ {
+ thread.start(ThreadPriority.Normal);
+ }
+#else
+ thread.start();
+#endif
+ _threads.Add(thread);
+ }
+ }
+ catch(System.Exception ex)
+ {
+ string s = "cannot create thread for `" + _prefix + "':\n" + ex;
+ _instance.initializationData().logger.error(s);
+
+ destroy();
+ joinWithAllThreads();
+ throw;
+ }
+ }
+
+ public void destroy()
+ {
+ lock(this)
+ {
+ if(_destroyed)
+ {
+ return;
+ }
+ _destroyed = true;
+ System.Threading.Monitor.PulseAll(this);
+ }
+ }
+
+ public void updateObservers()
+ {
+ lock(this)
+ {
+ foreach(WorkerThread t in _threads)
+ {
+ t.updateObserver();
+ }
+ }
+ }
+
+ public void initialize(EventHandler handler)
+ {
+ // Nothing to do.
+ }
+
+ public void register(EventHandler handler, int op)
+ {
+ update(handler, SocketOperation.None, op);
+ }
+
+ public void update(EventHandler handler, int remove, int add)
+ {
+ lock(this)
+ {
+ Debug.Assert(!_destroyed);
+
+ // Don't remove what needs to be added
+ remove &= ~add;
+
+ // Don't remove/add if already un-registered or registered
+ remove &= handler._registered;
+ add &= ~handler._registered;
+ if(remove == add)
+ {
+ return;
+ }
+
+ handler._registered &= ~remove;
+ handler._registered |= add;
+
+ if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0)
+ {
+ handler._pending |= SocketOperation.Read;
+ executeNonBlocking(() =>
+ {
+ messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read));
+ });
+ }
+ else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0)
+ {
+ handler._pending |= SocketOperation.Write;
+ executeNonBlocking(() =>
+ {
+ messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write));
+ });
+ }
+ }
+ }
+
+ public void unregister(EventHandler handler, int op)
+ {
+ update(handler, op, SocketOperation.None);
+ }
+
+ public void finish(EventHandler handler)
+ {
+ lock(this)
+ {
+ Debug.Assert(!_destroyed);
+
+ //
+ // If there are no pending asynchronous operations, we can call finish on the handler now.
+ //
+ if(handler._pending == 0)
+ {
+ handler._registered = SocketOperation.None;
+ executeNonBlocking(() =>
+ {
+ ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None);
+ handler.finished(ref current);
+ });
+ }
+ else
+ {
+ handler._finish = true;
+ }
+ }
+ }
+
+#if COMPACT
+ public void dispatchFromThisThread(Ice.VoidAction call, Ice.Connection con)
+#else
+ public void dispatchFromThisThread(System.Action call, Ice.Connection con)
+#endif
+ {
+ if(_dispatcher != null)
+ {
+ try
+ {
+ _dispatcher(call, con);
+ }
+ catch(System.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault(
+ "Ice.Warn.Dispatch", 1) > 1)
+ {
+ _instance.initializationData().logger.warning("dispatch exception:\n" + ex);
+ }
+ }
+ }
+ else
+ {
+ call();
+ }
+ }
+
+#if COMPACT
+ public void dispatch(Ice.VoidAction call, Ice.Connection con)
+#else
+ public void dispatch(System.Action call, Ice.Connection con)
+#endif
+ {
+ lock(this)
+ {
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
+ _workItems.Enqueue(() =>
+ {
+ dispatchFromThisThread(call, con);
+ });
+ System.Threading.Monitor.Pulse(this);
+
+ //
+ // If this is a dynamic thread pool which can still grow and if all threads are
+ // currently busy dispatching or about to dispatch, we spawn a new thread to
+ // execute this new work item right away.
+ //
+ if(_threads.Count < _sizeMax &&
+ (_inUse + _workItems.Count) > _threads.Count &&
+ !_destroyed)
+ {
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ try
+ {
+ WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+#if !SILVERLIGHT
+ if(_hasPriority)
+ {
+ t.start(_priority);
+ }
+ else
+ {
+ t.start(ThreadPriority.Normal);
+ }
+#else
+ t.start();
+#endif
+ _threads.Add(t);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "cannot create thread for `" + _prefix + "':\n" + ex;
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ }
+ }
+
+ public void executeNonBlocking(ThreadPoolWorkItem workItem)
+ {
+ lock(this)
+ {
+ Debug.Assert(!_destroyed);
+ _instance.asyncIOThread().queue(workItem);
+ }
+ }
+
+ public void joinWithAllThreads()
+ {
+ //
+ // _threads is immutable after destroy() has been called,
+ // therefore no synchronization is needed. (Synchronization
+ // wouldn't be possible here anyway, because otherwise the
+ // other threads would never terminate.)
+ //
+ Debug.Assert(_destroyed);
+ foreach(WorkerThread thread in _threads)
+ {
+ thread.join();
+ }
+ }
+
+ public string prefix()
+ {
+ return _prefix;
+ }
+
+ public bool serialize()
+ {
+ return _serialize;
+ }
+
+ private void run(WorkerThread thread)
+ {
+ ThreadPoolWorkItem workItem = null;
+ while(true)
+ {
+ lock(this)
+ {
+ if(workItem != null)
+ {
+ Debug.Assert(_inUse > 0);
+ --_inUse;
+ if(_workItems.Count == 0)
+ {
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ }
+ }
+
+ workItem = null;
+
+ while(_workItems.Count == 0)
+ {
+ if(_destroyed)
+ {
+ return;
+ }
+
+ if(_threadIdleTime > 0)
+ {
+ if(!System.Threading.Monitor.Wait(this, _threadIdleTime * 1000) && _workItems.Count == 0) // If timeout
+ {
+ if(_destroyed)
+ {
+ return;
+ }
+ else if(_serverIdleTime == 0 || _threads.Count > 1)
+ {
+ //
+ // If not the last thread or if server idle time isn't configured,
+ // we can exit. Unlike C++/Java, there's no need to have a thread
+ // always spawned in the thread pool because all the IO is done
+ // by the .NET thread pool threads. Instead, we'll just spawn a
+ // new thread when needed (i.e.: when a new work item is queued).
+ //
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1);
+ _instance.initializationData().logger.trace(
+ _instance.traceLevels().threadPoolCat, s);
+ }
+
+ _threads.Remove(thread);
+ _instance.asyncIOThread().queue(() =>
+ {
+ thread.join();
+ });
+ return;
+ }
+ else
+ {
+ Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1);
+ if(!System.Threading.Monitor.Wait(this, _serverIdleTime * 1000) &&
+ _workItems.Count == 0)
+ {
+ if(!_destroyed)
+ {
+ _workItems.Enqueue(() =>
+ {
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException)
+ {
+ }
+ });
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ System.Threading.Monitor.Wait(this);
+ }
+ }
+
+ Debug.Assert(_workItems.Count > 0);
+ workItem = _workItems.Dequeue();
+
+ Debug.Assert(_inUse >= 0);
+ ++_inUse;
+
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
+
+ if(_sizeMax > 1 && _inUse == _sizeWarn)
+ {
+ string s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
+ }
+
+ try
+ {
+ workItem();
+ }
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ }
+
+ public bool startMessage(ref ThreadPoolCurrent current)
+ {
+ Debug.Assert((current._handler._pending & current.operation) != 0);
+
+ if((current._handler._started & current.operation) != 0)
+ {
+ Debug.Assert((current._handler._ready & current.operation) == 0);
+ current._handler._ready |= current.operation;
+ current._handler._started &= ~current.operation;
+ if(!current._handler.finishAsync(current.operation)) // Returns false if the handler is finished.
+ {
+ current._handler._pending &= ~current.operation;
+ if(current._handler._pending == 0 && current._handler._finish)
+ {
+ finish(current._handler);
+ }
+ return false;
+ }
+ }
+ else if((current._handler._ready & current.operation) == 0 &&
+ (current._handler._registered & current.operation) != 0)
+ {
+ Debug.Assert((current._handler._started & current.operation) == 0);
+ bool completed = false;
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
+ {
+ current._handler._pending &= ~current.operation;
+ if(current._handler._pending == 0 && current._handler._finish)
+ {
+ finish(current._handler);
+ }
+ return false;
+ }
+ else
+ {
+ current.completedSynchronously = completed;
+ current._handler._started |= current.operation;
+ return false;
+ }
+ }
+
+ if((current._handler._registered & current.operation) != 0)
+ {
+ Debug.Assert((current._handler._ready & current.operation) != 0);
+ current._handler._ready &= ~current.operation;
+ return true;
+ }
+ else
+ {
+ current._handler._pending &= ~current.operation;
+ if(current._handler._pending == 0 && current._handler._finish)
+ {
+ finish(current._handler);
+ }
+ return false;
+ }
+ }
+
+ public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)
+ {
+ if((current._handler._registered & current.operation) != 0)
+ {
+ if(fromIOThread)
+ {
+ Debug.Assert((current._handler._ready & current.operation) == 0);
+ bool completed = false;
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
+ {
+ current._handler._pending &= ~current.operation;
+ }
+ else
+ {
+ Debug.Assert((current._handler._pending & current.operation) != 0);
+ current.completedSynchronously = completed;
+ current._handler._started |= current.operation;
+ }
+ }
+ else
+ {
+ ThreadPoolCurrent c = current;
+ executeNonBlocking(() =>
+ {
+ messageCallback(c);
+ });
+ }
+ }
+ else
+ {
+ current._handler._pending &= ~current.operation;
+ }
+
+ if(current._handler._pending == 0 && current._handler._finish)
+ {
+ // There are no more pending async operations, it's time to call finish.
+ finish(current._handler);
+ }
+ }
+
+ public void asyncReadCallback(object state)
+ {
+ messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read));
+ }
+
+ public void asyncWriteCallback(object state)
+ {
+ messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write));
+ }
+
+ public void messageCallback(ThreadPoolCurrent current)
+ {
+ try
+ {
+ do
+ {
+ current.completedSynchronously = false;
+ current._handler.message(ref current);
+ }
+ while(current.completedSynchronously);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString();
+ _instance.initializationData().logger.error(s);
+ }
+ }
+
+ private AsyncCallback getCallback(int operation)
+ {
+ switch(operation)
+ {
+ case SocketOperation.Read:
+ return asyncReadCallback;
+ case SocketOperation.Write:
+ return asyncWriteCallback;
+ default:
+ Debug.Assert(false);
+ return null;
+ }
+ }
+
+ private Instance _instance;
+ private Ice.Dispatcher _dispatcher;
+ private bool _destroyed;
+ private readonly string _prefix;
+ private readonly string _threadPrefix;
+
+ private sealed class WorkerThread
+ {
+ private ThreadPool _threadPool;
+ private Ice.Instrumentation.ThreadObserver _observer;
+ private Ice.Instrumentation.ThreadState _state;
+
+ internal WorkerThread(ThreadPool threadPool, string name) : base()
+ {
+ _threadPool = threadPool;
+ _name = name;
+ _state = Ice.Instrumentation.ThreadState.ThreadStateIdle;
+ updateObserver();
+ }
+
+ public void updateObserver()
+ {
+ // Must be called with the thread pool mutex locked
+ Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer;
+ if(obsv != null)
+ {
+ _observer = obsv.getThreadObserver(_threadPool._prefix, _name, _state, _observer);
+ if(_observer != null)
+ {
+ _observer.attach();
+ }
+ }
+ }
+
+ public void setState(Ice.Instrumentation.ThreadState s)
+ {
+ // Must be called with the thread pool mutex locked
+ if(_observer != null)
+ {
+ if(_state != s)
+ {
+ _observer.stateChanged(_state, s);
+ }
+ }
+ _state = s;
+ }
+
+ public void join()
+ {
+ _thread.Join();
+ }
+
+#if !SILVERLIGHT
+ public void start(ThreadPriority priority)
+ {
+ if(_threadPool._stackSize == 0)
+ {
+ _thread = new Thread(new ThreadStart(Run));
+ }
+ else
+ {
+ _thread = new Thread(new ThreadStart(Run), _threadPool._stackSize);
+ }
+ _thread.IsBackground = true;
+ _thread.Name = _name;
+ _thread.Priority = priority;
+ _thread.Start();
+ }
+#else
+ public void start()
+ {
+ _thread = new Thread(new ThreadStart(Run));
+ _thread.IsBackground = true;
+ _thread.Name = _name;
+ _thread.Start();
+ }
+#endif
+
+ public void Run()
+ {
+ if(_threadPool._instance.initializationData().threadHook != null)
+ {
+ try
+ {
+ _threadPool._instance.initializationData().threadHook.start();
+ }
+ catch(System.Exception ex)
+ {
+ string s = "thread hook start() method raised an unexpected exception in `";
+ s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
+ _threadPool._instance.initializationData().logger.error(s);
+ }
+ }
+
+ try
+ {
+ _threadPool.run(this);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
+ _threadPool._instance.initializationData().logger.error(s);
+ }
+
+ if(_observer != null)
+ {
+ _observer.detach();
+ }
+
+ if(_threadPool._instance.initializationData().threadHook != null)
+ {
+ try
+ {
+ _threadPool._instance.initializationData().threadHook.stop();
+ }
+ catch(System.Exception ex)
+ {
+ string s = "thread hook stop() method raised an unexpected exception in `";
+ s += _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex;
+ _threadPool._instance.initializationData().logger.error(s);
+ }
+ }
+ }
+
+ private readonly string _name;
+ private Thread _thread;
+ }
+
+ private readonly int _size; // Number of threads that are pre-created.
+ private readonly int _sizeMax; // Maximum number of threads.
+ private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+ private readonly bool _serialize; // True if requests need to be serialized over the connection.
+#if !SILVERLIGHT
+ private readonly ThreadPriority _priority;
+ private readonly bool _hasPriority = false;
+#endif
+ private readonly int _serverIdleTime;
+ private readonly int _threadIdleTime;
+ private readonly int _stackSize;
+
+ private List<WorkerThread> _threads; // All threads, running or not.
+ private int _threadIndex; // For assigning thread names.
+ private int _inUse; // Number of threads that are currently in use.
+
+ private Queue<ThreadPoolWorkItem> _workItems;
+ }
+
+}