diff options
Diffstat (limited to 'csharp/src/Ice/ThreadPool.cs')
-rw-r--r-- | csharp/src/Ice/ThreadPool.cs | 401 |
1 files changed, 181 insertions, 220 deletions
diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs index cad6db97da2..c113ec1ab57 100644 --- a/csharp/src/Ice/ThreadPool.cs +++ b/csharp/src/Ice/ThreadPool.cs @@ -4,12 +4,14 @@ namespace IceInternal { + using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; - public delegate void ThreadPoolWorkItem(); + public delegate void ThreadPoolWorkItem(ThreadPoolCurrent current); + public delegate void AsyncCallback(object state); // @@ -30,7 +32,7 @@ namespace IceInternal // Dispatch the continuation on the thread pool if this isn't called // already from a thread pool thread. We don't use the dispatcher // for the continuations, the dispatcher is only used when the - // call is initialy invoked (e.g.: a servant dispatch after being + // call is initially invoked (e.g.: a servant dispatch after being // received is dispatched using the dispatcher which might dispatch // the call on the UI thread which will then use its own synchronization // context to execute continuations). @@ -38,7 +40,7 @@ namespace IceInternal var ctx = Current as ThreadPoolSynchronizationContext; if(ctx != this) { - _threadPool.dispatch(() => { d(state); }, null, false); + _threadPool.dispatch(() => { d(state); }, null); } else { @@ -54,100 +56,98 @@ namespace IceInternal private ThreadPool _threadPool; } - internal struct ThreadPoolMessage + + internal class ThreadPoolMessage : IDisposable { - public ThreadPoolMessage(object mutex) + public ThreadPoolMessage(ThreadPoolCurrent current, object mutex) { + _current = current; _mutex = mutex; _finish = false; _finishWithIO = false; } - public bool startIOScope(ref ThreadPoolCurrent current) + public bool startIOScope() { // This must be called with the handler locked. - _finishWithIO = current.startMessage(); + _finishWithIO = _current.startMessage(); return _finishWithIO; } - public void finishIOScope(ref ThreadPoolCurrent current) + public void finishIOScope() { if(_finishWithIO) { - lock(_mutex) - { - current.finishMessage(true); - } + // This must be called with the handler locked. + _current.finishMessage(); } } - public void completed(ref ThreadPoolCurrent current) + public void ioCompleted() { // // Call finishMessage once IO is completed only if serialization is not enabled. // Otherwise, finishMessage will be called when the event handler is done with - // the message (it will be called from destroy below). + // the message (it will be called from Dispose below). // Debug.Assert(_finishWithIO); - if(current.ioCompleted()) + if(_current.ioCompleted()) { _finishWithIO = false; _finish = true; } } - public void destroy(ref ThreadPoolCurrent current) + public void Dispose() { if(_finish) { // - // A ThreadPoolMessage instance must be created outside the synchronization - // of the event handler. We need to lock the event handler here to call - // finishMessage. + // A ThreadPoolMessage instance must be created outside the synchronization of the event handler. We + // need to lock the event handler here to call finishMessage. // lock(_mutex) { - current.finishMessage(false); - Debug.Assert(!current.completedSynchronously); + _current.finishMessage(); } } } + private ThreadPoolCurrent _current; private object _mutex; private bool _finish; private bool _finishWithIO; } - public struct ThreadPoolCurrent + public class ThreadPoolCurrent { - public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op) + internal ThreadPoolCurrent(ThreadPool threadPool, ThreadPool.WorkerThread thread) { _threadPool = threadPool; - _handler = handler; - operation = op; - completedSynchronously = false; + _thread = thread; } - public readonly int operation; - public bool completedSynchronously; + public int operation; public bool ioCompleted() { - return _threadPool.serialize(); + return _threadPool.ioCompleted(this); } public bool startMessage() { - return _threadPool.startMessage(ref this); + return _threadPool.startMessage(this); } - public void finishMessage(bool fromIOThread) + public void finishMessage() { - _threadPool.finishMessage(ref this, fromIOThread); + _threadPool.finishMessage(this); } internal readonly ThreadPool _threadPool; - internal readonly EventHandler _handler; + internal readonly ThreadPool.WorkerThread _thread; + internal bool _ioCompleted; + internal EventHandler _handler; } public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler @@ -251,7 +251,7 @@ namespace IceInternal _threads = new List<WorkerThread>(); for(int i = 0; i < _size; ++i) { - WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); + var thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); thread.start(_priority); _threads.Add(thread); } @@ -329,18 +329,12 @@ namespace IceInternal if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0) { handler._pending |= SocketOperation.Read; - executeNonBlocking(() => - { - messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read)); - }); + queueReadyForIOHandler(handler, SocketOperation.Read); } else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0) { handler._pending |= SocketOperation.Write; - executeNonBlocking(() => - { - messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write)); - }); + queueReadyForIOHandler(handler, SocketOperation.Write); } } } @@ -363,11 +357,13 @@ namespace IceInternal // if(handler._pending == 0) { - executeNonBlocking(() => - { - ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None); - handler.finished(ref current); - }); + _workItems.Enqueue(current => + { + current.operation = SocketOperation.None; + current._handler = handler; + handler.finished(ref current); + }); + Monitor.Pulse(this); } else { @@ -376,13 +372,13 @@ namespace IceInternal } } - public void dispatchFromThisThread(System.Action call, Ice.Connection con) + public void dispatchFromThisThread(System.Action call, Ice.Connection connection) { if(_dispatcher != null) { try { - _dispatcher(call, con); + _dispatcher(call, connection); } catch(System.Exception ex) { @@ -399,7 +395,7 @@ namespace IceInternal } } - public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true) + public void dispatch(Action workItem, Ice.Connection connection) { lock(this) { @@ -407,63 +403,21 @@ namespace IceInternal { throw new Ice.CommunicatorDestroyedException(); } - - if(useDispatcher) - { - _workItems.Enqueue(() => { dispatchFromThisThread(call, con); }); - } - else - { - _workItems.Enqueue(() => { call(); }); - } - Monitor.Pulse(this); - - // - // If this is a dynamic thread pool which can still grow and if all threads are - // currently busy dispatching or about to dispatch, we spawn a new thread to - // execute this new work item right away. - // - if(_threads.Count < _sizeMax && - (_inUse + _workItems.Count) > _threads.Count && - !_destroyed) - { - if(_instance.traceLevels().threadPool >= 1) - { - string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } - - try + _workItems.Enqueue(current => { - WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); - t.start(_priority); - _threads.Add(t); - } - catch(System.Exception ex) - { - string s = "cannot create thread for `" + _prefix + "':\n" + ex; - _instance.initializationData().logger.error(s); - } - } - } - } - - public void executeNonBlocking(ThreadPoolWorkItem workItem) - { - lock(this) - { - Debug.Assert(!_destroyed); - _instance.asyncIOThread().queue(workItem); + current.ioCompleted(); + dispatchFromThisThread(workItem, connection); + }); + Monitor.Pulse(this); } } public void joinWithAllThreads() { // - // _threads is immutable after destroy() has been called, - // therefore no synchronization is needed. (Synchronization - // wouldn't be possible here anyway, because otherwise the - // other threads would never terminate.) + // _threads is immutable after destroy() has been called, therefore no synchronization is needed. + // (Synchronization wouldn't be possible here anyway, because otherwise the other threads would never + // terminate.) // Debug.Assert(_destroyed); foreach(WorkerThread thread in _threads) @@ -484,7 +438,7 @@ namespace IceInternal protected sealed override void QueueTask(System.Threading.Tasks.Task task) { - dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null); + dispatch(() => { TryExecuteTask(task); }, null); } protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) @@ -507,25 +461,38 @@ namespace IceInternal return new System.Threading.Tasks.Task[0]; } - private void run(WorkerThread thread) + private void queueReadyForIOHandler(EventHandler handler, int operation) { - ThreadPoolWorkItem workItem = null; - while(true) + lock(this) { - lock(this) - { - if(workItem != null) + Debug.Assert(!_destroyed); + _workItems.Enqueue(current => { - Debug.Assert(_inUse > 0); - --_inUse; - if(_workItems.Count == 0) + current._handler = handler; + current.operation = operation; + try { - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); + current._handler.message(ref current); } - } - - workItem = null; + catch(System.Exception ex) + { + string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + + current._handler.ToString(); + _instance.initializationData().logger.error(s); + } + }); + Monitor.Pulse(this); + } + } + private void run(WorkerThread thread) + { + var current = new ThreadPoolCurrent(this, thread); + while(true) + { + ThreadPoolWorkItem workItem = null; + lock(this) + { while(_workItems.Count == 0) { if(_destroyed) @@ -541,15 +508,8 @@ namespace IceInternal { return; } - else if(_serverIdleTime == 0 || _threads.Count > 1) + else if (_inUse < _threads.Count - 1) { - // - // If not the last thread or if server idle time isn't configured, - // we can exit. Unlike C++/Java, there's no need to have a thread - // always spawned in the thread pool because all the IO is done - // by the .NET thread pool threads. Instead, we'll just spawn a - // new thread when needed (i.e.: when a new work item is queued). - // if(_instance.traceLevels().threadPool >= 1) { string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1); @@ -558,32 +518,39 @@ namespace IceInternal } _threads.Remove(thread); - _instance.asyncIOThread().queue(() => + _workItems.Enqueue(c => { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). thread.join(); }); + Monitor.Pulse(this); return; } - else + else if (_inUse > 0) { - Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1); - if(!Monitor.Wait(this, _serverIdleTime * 1000) && - _workItems.Count == 0) - { - if(!_destroyed) + // + // If this is the last idle thread but there are still other threads + // busy dispatching, we go back waiting with _threadIdleTime. We only + // wait with _serverIdleTime when there's only one thread left. + // + continue; + } + + Debug.Assert(_threads.Count == 1); + if(!Monitor.Wait(this, _serverIdleTime * 1000) && !_destroyed) + { + _workItems.Enqueue(c => { - _workItems.Enqueue(() => - { - try - { - _instance.objectAdapterFactory().shutdown(); - } - catch(Ice.CommunicatorDestroyedException) - { - } - }); - } - } + c.ioCompleted(); + try + { + _instance.objectAdapterFactory().shutdown(); + } + catch(Ice.CommunicatorDestroyedException) + { + } + }); } } } @@ -596,32 +563,78 @@ namespace IceInternal Debug.Assert(_workItems.Count > 0); workItem = _workItems.Dequeue(); + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); + current._ioCompleted = false; + } + + try + { + workItem(current); + } + catch(System.Exception ex) + { + string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n'; + _instance.initializationData().logger.error(s); + } + + lock (this) + { + if (_sizeMax > 1 && current._ioCompleted) + { + Debug.Assert(_inUse > 0); + --_inUse; + } + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); + } + } + } + + public bool ioCompleted(ThreadPoolCurrent current) + { + lock(this) + { + current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. + + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); + + if(_sizeMax > 1) + { Debug.Assert(_inUse >= 0); ++_inUse; - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); - if(_sizeMax > 1 && _inUse == _sizeWarn) { string s = "thread pool `" + _prefix + "' is running low on threads\n" + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; _instance.initializationData().logger.warning(s); } - } - try - { - workItem(); - } - catch(System.Exception ex) - { - string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n'; - _instance.initializationData().logger.error(s); + if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.Count) + { + if(_instance.traceLevels().threadPool >= 1) + { + string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } + + try + { + var t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); + t.start(_priority); + _threads.Add(t); + } + catch(System.Exception ex) + { + string s = "cannot create thread for `" + _prefix + "':\n" + ex; + _instance.initializationData().logger.error(s); + } + } } } + return _serialize; } - public bool startMessage(ref ThreadPoolCurrent current) + public bool startMessage(ThreadPoolCurrent current) { Debug.Assert((current._handler._pending & current.operation) != 0); @@ -644,8 +657,7 @@ namespace IceInternal (current._handler._registered & current.operation) != 0) { Debug.Assert((current._handler._started & current.operation) == 0); - bool completed = false; - if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) + if(!current._handler.startAsync(current.operation, getCallback(current.operation))) { current._handler._pending &= ~current.operation; if(current._handler._pending == 0 && current._handler._finish) @@ -656,7 +668,6 @@ namespace IceInternal } else { - current.completedSynchronously = completed; current._handler._started |= current.operation; return false; } @@ -679,32 +690,19 @@ namespace IceInternal } } - public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread) + public void finishMessage(ThreadPoolCurrent current) { if((current._handler._registered & current.operation) != 0) { - if(fromIOThread) + Debug.Assert((current._handler._ready & current.operation) == 0); + if(!current._handler.startAsync(current.operation, getCallback(current.operation))) { - Debug.Assert((current._handler._ready & current.operation) == 0); - bool completed = false; - if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) - { - current._handler._pending &= ~current.operation; - } - else - { - Debug.Assert((current._handler._pending & current.operation) != 0); - current.completedSynchronously = completed; - current._handler._started |= current.operation; - } + current._handler._pending &= ~current.operation; } else { - ThreadPoolCurrent c = current; - executeNonBlocking(() => - { - messageCallback(c); - }); + Debug.Assert((current._handler._pending & current.operation) != 0); + current._handler._started |= current.operation; } } else @@ -719,55 +717,13 @@ namespace IceInternal } } - public void asyncReadCallback(object state) - { - messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read)); - } - - public void asyncWriteCallback(object state) - { - messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write)); - } - - public void messageCallback(ThreadPoolCurrent current) - { - try - { - do - { - current.completedSynchronously = false; - current._handler.message(ref current); - } - while(current.completedSynchronously); - } - catch(System.Exception ex) - { - string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString(); - _instance.initializationData().logger.error(s); - } - } - private AsyncCallback getCallback(int operation) { - switch(operation) - { - case SocketOperation.Read: - return asyncReadCallback; - case SocketOperation.Write: - return asyncWriteCallback; - default: - Debug.Assert(false); - return null; - } + Debug.Assert(operation == SocketOperation.Read || operation == SocketOperation.Write); + return state => queueReadyForIOHandler((EventHandler)state, operation); } - private Instance _instance; - private System.Action<System.Action, Ice.Connection> _dispatcher; - private bool _destroyed; - private readonly string _prefix; - private readonly string _threadPrefix; - - private sealed class WorkerThread + internal sealed class WorkerThread { private ThreadPool _threadPool; private Ice.Instrumentation.ThreadObserver _observer; @@ -890,6 +846,11 @@ namespace IceInternal private Thread _thread; } + private Instance _instance; + private System.Action<System.Action, Ice.Connection> _dispatcher; + private bool _destroyed; + private readonly string _prefix; + private readonly string _threadPrefix; private readonly int _size; // Number of threads that are pre-created. private readonly int _sizeMax; // Maximum number of threads. private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. |