summaryrefslogtreecommitdiff
path: root/csharp/src/Ice/ThreadPool.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/Ice/ThreadPool.cs')
-rw-r--r--csharp/src/Ice/ThreadPool.cs401
1 files changed, 181 insertions, 220 deletions
diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs
index cad6db97da2..c113ec1ab57 100644
--- a/csharp/src/Ice/ThreadPool.cs
+++ b/csharp/src/Ice/ThreadPool.cs
@@ -4,12 +4,14 @@
namespace IceInternal
{
+ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
- public delegate void ThreadPoolWorkItem();
+ public delegate void ThreadPoolWorkItem(ThreadPoolCurrent current);
+
public delegate void AsyncCallback(object state);
//
@@ -30,7 +32,7 @@ namespace IceInternal
// Dispatch the continuation on the thread pool if this isn't called
// already from a thread pool thread. We don't use the dispatcher
// for the continuations, the dispatcher is only used when the
- // call is initialy invoked (e.g.: a servant dispatch after being
+ // call is initially invoked (e.g.: a servant dispatch after being
// received is dispatched using the dispatcher which might dispatch
// the call on the UI thread which will then use its own synchronization
// context to execute continuations).
@@ -38,7 +40,7 @@ namespace IceInternal
var ctx = Current as ThreadPoolSynchronizationContext;
if(ctx != this)
{
- _threadPool.dispatch(() => { d(state); }, null, false);
+ _threadPool.dispatch(() => { d(state); }, null);
}
else
{
@@ -54,100 +56,98 @@ namespace IceInternal
private ThreadPool _threadPool;
}
- internal struct ThreadPoolMessage
+
+ internal class ThreadPoolMessage : IDisposable
{
- public ThreadPoolMessage(object mutex)
+ public ThreadPoolMessage(ThreadPoolCurrent current, object mutex)
{
+ _current = current;
_mutex = mutex;
_finish = false;
_finishWithIO = false;
}
- public bool startIOScope(ref ThreadPoolCurrent current)
+ public bool startIOScope()
{
// This must be called with the handler locked.
- _finishWithIO = current.startMessage();
+ _finishWithIO = _current.startMessage();
return _finishWithIO;
}
- public void finishIOScope(ref ThreadPoolCurrent current)
+ public void finishIOScope()
{
if(_finishWithIO)
{
- lock(_mutex)
- {
- current.finishMessage(true);
- }
+ // This must be called with the handler locked.
+ _current.finishMessage();
}
}
- public void completed(ref ThreadPoolCurrent current)
+ public void ioCompleted()
{
//
// Call finishMessage once IO is completed only if serialization is not enabled.
// Otherwise, finishMessage will be called when the event handler is done with
- // the message (it will be called from destroy below).
+ // the message (it will be called from Dispose below).
//
Debug.Assert(_finishWithIO);
- if(current.ioCompleted())
+ if(_current.ioCompleted())
{
_finishWithIO = false;
_finish = true;
}
}
- public void destroy(ref ThreadPoolCurrent current)
+ public void Dispose()
{
if(_finish)
{
//
- // A ThreadPoolMessage instance must be created outside the synchronization
- // of the event handler. We need to lock the event handler here to call
- // finishMessage.
+ // A ThreadPoolMessage instance must be created outside the synchronization of the event handler. We
+ // need to lock the event handler here to call finishMessage.
//
lock(_mutex)
{
- current.finishMessage(false);
- Debug.Assert(!current.completedSynchronously);
+ _current.finishMessage();
}
}
}
+ private ThreadPoolCurrent _current;
private object _mutex;
private bool _finish;
private bool _finishWithIO;
}
- public struct ThreadPoolCurrent
+ public class ThreadPoolCurrent
{
- public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op)
+ internal ThreadPoolCurrent(ThreadPool threadPool, ThreadPool.WorkerThread thread)
{
_threadPool = threadPool;
- _handler = handler;
- operation = op;
- completedSynchronously = false;
+ _thread = thread;
}
- public readonly int operation;
- public bool completedSynchronously;
+ public int operation;
public bool ioCompleted()
{
- return _threadPool.serialize();
+ return _threadPool.ioCompleted(this);
}
public bool startMessage()
{
- return _threadPool.startMessage(ref this);
+ return _threadPool.startMessage(this);
}
- public void finishMessage(bool fromIOThread)
+ public void finishMessage()
{
- _threadPool.finishMessage(ref this, fromIOThread);
+ _threadPool.finishMessage(this);
}
internal readonly ThreadPool _threadPool;
- internal readonly EventHandler _handler;
+ internal readonly ThreadPool.WorkerThread _thread;
+ internal bool _ioCompleted;
+ internal EventHandler _handler;
}
public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler
@@ -251,7 +251,7 @@ namespace IceInternal
_threads = new List<WorkerThread>();
for(int i = 0; i < _size; ++i)
{
- WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+ var thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
thread.start(_priority);
_threads.Add(thread);
}
@@ -329,18 +329,12 @@ namespace IceInternal
if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0)
{
handler._pending |= SocketOperation.Read;
- executeNonBlocking(() =>
- {
- messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read));
- });
+ queueReadyForIOHandler(handler, SocketOperation.Read);
}
else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0)
{
handler._pending |= SocketOperation.Write;
- executeNonBlocking(() =>
- {
- messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write));
- });
+ queueReadyForIOHandler(handler, SocketOperation.Write);
}
}
}
@@ -363,11 +357,13 @@ namespace IceInternal
//
if(handler._pending == 0)
{
- executeNonBlocking(() =>
- {
- ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None);
- handler.finished(ref current);
- });
+ _workItems.Enqueue(current =>
+ {
+ current.operation = SocketOperation.None;
+ current._handler = handler;
+ handler.finished(ref current);
+ });
+ Monitor.Pulse(this);
}
else
{
@@ -376,13 +372,13 @@ namespace IceInternal
}
}
- public void dispatchFromThisThread(System.Action call, Ice.Connection con)
+ public void dispatchFromThisThread(System.Action call, Ice.Connection connection)
{
if(_dispatcher != null)
{
try
{
- _dispatcher(call, con);
+ _dispatcher(call, connection);
}
catch(System.Exception ex)
{
@@ -399,7 +395,7 @@ namespace IceInternal
}
}
- public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true)
+ public void dispatch(Action workItem, Ice.Connection connection)
{
lock(this)
{
@@ -407,63 +403,21 @@ namespace IceInternal
{
throw new Ice.CommunicatorDestroyedException();
}
-
- if(useDispatcher)
- {
- _workItems.Enqueue(() => { dispatchFromThisThread(call, con); });
- }
- else
- {
- _workItems.Enqueue(() => { call(); });
- }
- Monitor.Pulse(this);
-
- //
- // If this is a dynamic thread pool which can still grow and if all threads are
- // currently busy dispatching or about to dispatch, we spawn a new thread to
- // execute this new work item right away.
- //
- if(_threads.Count < _sizeMax &&
- (_inUse + _workItems.Count) > _threads.Count &&
- !_destroyed)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
+ _workItems.Enqueue(current =>
{
- WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
- t.start(_priority);
- _threads.Add(t);
- }
- catch(System.Exception ex)
- {
- string s = "cannot create thread for `" + _prefix + "':\n" + ex;
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
-
- public void executeNonBlocking(ThreadPoolWorkItem workItem)
- {
- lock(this)
- {
- Debug.Assert(!_destroyed);
- _instance.asyncIOThread().queue(workItem);
+ current.ioCompleted();
+ dispatchFromThisThread(workItem, connection);
+ });
+ Monitor.Pulse(this);
}
}
public void joinWithAllThreads()
{
//
- // _threads is immutable after destroy() has been called,
- // therefore no synchronization is needed. (Synchronization
- // wouldn't be possible here anyway, because otherwise the
- // other threads would never terminate.)
+ // _threads is immutable after destroy() has been called, therefore no synchronization is needed.
+ // (Synchronization wouldn't be possible here anyway, because otherwise the other threads would never
+ // terminate.)
//
Debug.Assert(_destroyed);
foreach(WorkerThread thread in _threads)
@@ -484,7 +438,7 @@ namespace IceInternal
protected sealed override void QueueTask(System.Threading.Tasks.Task task)
{
- dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null);
+ dispatch(() => { TryExecuteTask(task); }, null);
}
protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)
@@ -507,25 +461,38 @@ namespace IceInternal
return new System.Threading.Tasks.Task[0];
}
- private void run(WorkerThread thread)
+ private void queueReadyForIOHandler(EventHandler handler, int operation)
{
- ThreadPoolWorkItem workItem = null;
- while(true)
+ lock(this)
{
- lock(this)
- {
- if(workItem != null)
+ Debug.Assert(!_destroyed);
+ _workItems.Enqueue(current =>
{
- Debug.Assert(_inUse > 0);
- --_inUse;
- if(_workItems.Count == 0)
+ current._handler = handler;
+ current.operation = operation;
+ try
{
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ current._handler.message(ref current);
}
- }
-
- workItem = null;
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " +
+ current._handler.ToString();
+ _instance.initializationData().logger.error(s);
+ }
+ });
+ Monitor.Pulse(this);
+ }
+ }
+ private void run(WorkerThread thread)
+ {
+ var current = new ThreadPoolCurrent(this, thread);
+ while(true)
+ {
+ ThreadPoolWorkItem workItem = null;
+ lock(this)
+ {
while(_workItems.Count == 0)
{
if(_destroyed)
@@ -541,15 +508,8 @@ namespace IceInternal
{
return;
}
- else if(_serverIdleTime == 0 || _threads.Count > 1)
+ else if (_inUse < _threads.Count - 1)
{
- //
- // If not the last thread or if server idle time isn't configured,
- // we can exit. Unlike C++/Java, there's no need to have a thread
- // always spawned in the thread pool because all the IO is done
- // by the .NET thread pool threads. Instead, we'll just spawn a
- // new thread when needed (i.e.: when a new work item is queued).
- //
if(_instance.traceLevels().threadPool >= 1)
{
string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1);
@@ -558,32 +518,39 @@ namespace IceInternal
}
_threads.Remove(thread);
- _instance.asyncIOThread().queue(() =>
+ _workItems.Enqueue(c =>
{
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
thread.join();
});
+ Monitor.Pulse(this);
return;
}
- else
+ else if (_inUse > 0)
{
- Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1);
- if(!Monitor.Wait(this, _serverIdleTime * 1000) &&
- _workItems.Count == 0)
- {
- if(!_destroyed)
+ //
+ // If this is the last idle thread but there are still other threads
+ // busy dispatching, we go back waiting with _threadIdleTime. We only
+ // wait with _serverIdleTime when there's only one thread left.
+ //
+ continue;
+ }
+
+ Debug.Assert(_threads.Count == 1);
+ if(!Monitor.Wait(this, _serverIdleTime * 1000) && !_destroyed)
+ {
+ _workItems.Enqueue(c =>
{
- _workItems.Enqueue(() =>
- {
- try
- {
- _instance.objectAdapterFactory().shutdown();
- }
- catch(Ice.CommunicatorDestroyedException)
- {
- }
- });
- }
- }
+ c.ioCompleted();
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException)
+ {
+ }
+ });
}
}
}
@@ -596,32 +563,78 @@ namespace IceInternal
Debug.Assert(_workItems.Count > 0);
workItem = _workItems.Dequeue();
+ current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
+ current._ioCompleted = false;
+ }
+
+ try
+ {
+ workItem(current);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
+ _instance.initializationData().logger.error(s);
+ }
+
+ lock (this)
+ {
+ if (_sizeMax > 1 && current._ioCompleted)
+ {
+ Debug.Assert(_inUse > 0);
+ --_inUse;
+ }
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ }
+ }
+ }
+
+ public bool ioCompleted(ThreadPoolCurrent current)
+ {
+ lock(this)
+ {
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
+
+ current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
+
+ if(_sizeMax > 1)
+ {
Debug.Assert(_inUse >= 0);
++_inUse;
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
-
if(_sizeMax > 1 && _inUse == _sizeWarn)
{
string s = "thread pool `" + _prefix + "' is running low on threads\n"
+ "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
_instance.initializationData().logger.warning(s);
}
- }
- try
- {
- workItem();
- }
- catch(System.Exception ex)
- {
- string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
- _instance.initializationData().logger.error(s);
+ if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.Count)
+ {
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ try
+ {
+ var t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+ t.start(_priority);
+ _threads.Add(t);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "cannot create thread for `" + _prefix + "':\n" + ex;
+ _instance.initializationData().logger.error(s);
+ }
+ }
}
}
+ return _serialize;
}
- public bool startMessage(ref ThreadPoolCurrent current)
+ public bool startMessage(ThreadPoolCurrent current)
{
Debug.Assert((current._handler._pending & current.operation) != 0);
@@ -644,8 +657,7 @@ namespace IceInternal
(current._handler._registered & current.operation) != 0)
{
Debug.Assert((current._handler._started & current.operation) == 0);
- bool completed = false;
- if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation)))
{
current._handler._pending &= ~current.operation;
if(current._handler._pending == 0 && current._handler._finish)
@@ -656,7 +668,6 @@ namespace IceInternal
}
else
{
- current.completedSynchronously = completed;
current._handler._started |= current.operation;
return false;
}
@@ -679,32 +690,19 @@ namespace IceInternal
}
}
- public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)
+ public void finishMessage(ThreadPoolCurrent current)
{
if((current._handler._registered & current.operation) != 0)
{
- if(fromIOThread)
+ Debug.Assert((current._handler._ready & current.operation) == 0);
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation)))
{
- Debug.Assert((current._handler._ready & current.operation) == 0);
- bool completed = false;
- if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
- {
- current._handler._pending &= ~current.operation;
- }
- else
- {
- Debug.Assert((current._handler._pending & current.operation) != 0);
- current.completedSynchronously = completed;
- current._handler._started |= current.operation;
- }
+ current._handler._pending &= ~current.operation;
}
else
{
- ThreadPoolCurrent c = current;
- executeNonBlocking(() =>
- {
- messageCallback(c);
- });
+ Debug.Assert((current._handler._pending & current.operation) != 0);
+ current._handler._started |= current.operation;
}
}
else
@@ -719,55 +717,13 @@ namespace IceInternal
}
}
- public void asyncReadCallback(object state)
- {
- messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read));
- }
-
- public void asyncWriteCallback(object state)
- {
- messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write));
- }
-
- public void messageCallback(ThreadPoolCurrent current)
- {
- try
- {
- do
- {
- current.completedSynchronously = false;
- current._handler.message(ref current);
- }
- while(current.completedSynchronously);
- }
- catch(System.Exception ex)
- {
- string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString();
- _instance.initializationData().logger.error(s);
- }
- }
-
private AsyncCallback getCallback(int operation)
{
- switch(operation)
- {
- case SocketOperation.Read:
- return asyncReadCallback;
- case SocketOperation.Write:
- return asyncWriteCallback;
- default:
- Debug.Assert(false);
- return null;
- }
+ Debug.Assert(operation == SocketOperation.Read || operation == SocketOperation.Write);
+ return state => queueReadyForIOHandler((EventHandler)state, operation);
}
- private Instance _instance;
- private System.Action<System.Action, Ice.Connection> _dispatcher;
- private bool _destroyed;
- private readonly string _prefix;
- private readonly string _threadPrefix;
-
- private sealed class WorkerThread
+ internal sealed class WorkerThread
{
private ThreadPool _threadPool;
private Ice.Instrumentation.ThreadObserver _observer;
@@ -890,6 +846,11 @@ namespace IceInternal
private Thread _thread;
}
+ private Instance _instance;
+ private System.Action<System.Action, Ice.Connection> _dispatcher;
+ private bool _destroyed;
+ private readonly string _prefix;
+ private readonly string _threadPrefix;
private readonly int _size; // Number of threads that are pre-created.
private readonly int _sizeMax; // Maximum number of threads.
private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.