summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.h
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/ThreadPool.h
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/ThreadPool.h')
-rw-r--r--cpp/src/Ice/ThreadPool.h253
1 files changed, 228 insertions, 25 deletions
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 567e6143468..1b52124ea1c 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -22,12 +22,17 @@
#include <Ice/PropertiesF.h>
#include <Ice/EventHandlerF.h>
#include <Ice/Selector.h>
-#include <list>
+#include <Ice/BasicStream.h>
+
+#include <set>
namespace IceInternal
{
-class BasicStream;
+class ThreadPoolCurrent;
+
+class ThreadPoolWorkQueue;
+typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr;
class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex>
{
@@ -38,32 +43,42 @@ public:
void destroy();
- void incFdsInUse();
- void decFdsInUse();
-
- void _register(const EventHandlerPtr&);
- void unregister(const EventHandlerPtr&);
+ void initialize(const EventHandlerPtr&);
+ void _register(const EventHandlerPtr& handler, SocketOperation status)
+ {
+ update(handler, SocketOperationNone, status);
+ }
+ void update(const EventHandlerPtr&, SocketOperation, SocketOperation);
+ void unregister(const EventHandlerPtr& handler, SocketOperation status)
+ {
+ update(handler, status, SocketOperationNone);
+ }
void finish(const EventHandlerPtr&);
void execute(const ThreadPoolWorkItemPtr&);
- void promoteFollower(EventHandler* = 0);
void joinWithAllThreads();
std::string prefix() const;
-
+
private:
- bool run(); // Returns true if a follower should be promoted.
- bool read(const EventHandlerPtr&);
+ void run(const IceUtil::ThreadPtr&);
- InstancePtr _instance;
- bool _destroyed;
- const std::string _prefix;
+ bool ioCompleted(ThreadPoolCurrent&);
- Selector<EventHandler> _selector;
+#ifdef ICE_USE_IOCP
+ bool startMessage(ThreadPoolCurrent&);
+ void finishMessage(ThreadPoolCurrent&);
+#else
+ void promoteFollower(ThreadPoolCurrent&);
+ bool followerWait(const IceUtil::ThreadPtr&, ThreadPoolCurrent&);
+#endif
- std::list<ThreadPoolWorkItemPtr> _workItems;
- std::list<EventHandlerPtr> _finished;
+ const InstancePtr _instance;
+ ThreadPoolWorkQueuePtr _workQueue;
+ bool _destroyed;
+ const std::string _prefix;
+ Selector _selector;
class EventHandlerThread : public IceUtil::Thread
{
@@ -76,28 +91,216 @@ private:
ThreadPoolPtr _pool;
};
+
friend class EventHandlerThread;
+ friend class ThreadPoolCurrent;
const int _size; // Number of threads that are pre-created.
+ const int _sizeIO; // Maximum number of threads that can concurrently perform IO.
const int _sizeMax; // Maximum number of threads.
const int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
const bool _serialize; // True if requests need to be serialized over the connection.
-
+ const bool _hasPriority;
+ const int _priority;
+ const int _serverIdleTime;
+ const int _threadIdleTime;
const size_t _stackSize;
- std::vector<IceUtil::ThreadPtr> _threads; // All threads, running or not.
- int _running; // Number of running threads.
+ std::set<IceUtil::ThreadPtr> _threads; // All threads, running or not.
int _inUse; // Number of threads that are currently in use.
- double _load; // Current load in number of threads.
+#ifndef ICE_USE_IOCP
+ int _inUseIO; // Number of threads that are currently performing IO.
+ std::vector<std::pair<EventHandler*, SocketOperation> > _handlers;
+ std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler;
+#endif
bool _promote;
+};
- const bool _warnUdp;
+class ThreadPoolCurrent
+{
+public:
+
+ ThreadPoolCurrent(const InstancePtr&, const ThreadPoolPtr&);
+
+ SocketOperation operation;
+ BasicStream stream; // A per-thread stream to be used by event handlers for optimization.
+
+ bool ioCompleted() const
+ {
+ return _threadPool->ioCompleted(const_cast<ThreadPoolCurrent&>(*this));
+ }
+
+#ifdef ICE_USE_IOCP
+ bool startMessage()
+ {
+ return _threadPool->startMessage(const_cast<ThreadPoolCurrent&>(*this));
+ }
+
+ void finishMessage()
+ {
+ _threadPool->finishMessage(const_cast<ThreadPoolCurrent&>(*this));
+ }
+#endif
+
+private:
+
+ ThreadPool* _threadPool;
+ EventHandlerPtr _handler;
+ bool _ioCompleted;
+#ifndef ICE_USE_IOCP
+ bool _leader;
+#endif
+ friend class ThreadPool;
+};
+
+class ThreadPoolWorkItem : virtual public IceUtil::Shared
+{
+public:
+
+ virtual void execute(ThreadPoolCurrent&) = 0;
+};
+
+//
+// The ThreadPoolMessage class below hides the IOCP implementation details from
+// the event handler implementations. Only event handler implementation that
+// require IO need to use this class.
+//
+// An instance of the IOScope subclass must be created within the synchronization
+// of the event handler. It takes care of calling startMessage/finishMessage for
+// the IOCP implementation and ensures that finishMessage isn't called multiple
+// times.
+//
+#ifndef ICE_USE_IOCP
+template<class T> class ThreadPoolMessage
+{
+public:
+
+ class IOScope
+ {
+ public:
+
+ IOScope(ThreadPoolMessage<T>& message) : _message(message)
+ {
+ // Nothing to do.
+ }
+
+ ~IOScope()
+ {
+ // Nothing to do.
+ }
+
+ operator bool()
+ {
+ return true;
+ }
+
+ void completed()
+ {
+ _message._current.ioCompleted();
+ }
+
+ private:
+
+ ThreadPoolMessage<T>& _message;
+ };
+ friend class IOScope;
+
+ ThreadPoolMessage(ThreadPoolCurrent& current, const T&) : _current(current)
+ {
+ }
+
+ ~ThreadPoolMessage()
+ {
+ // Nothing to do.
+ }
+
+private:
+
+ ThreadPoolCurrent& _current;
+};
+
+#else
+
+template<class T> class ThreadPoolMessage
+{
+public:
+
+ class IOScope
+ {
+ public:
+
+ IOScope(ThreadPoolMessage& message) : _message(message)
+ {
+ // This must be called with the handler locked.
+ _finish = _message._current.startMessage();
+ }
+
+ ~IOScope()
+ {
+ if(_finish)
+ {
+ // This must be called with the handler locked.
+ _message._current.finishMessage();
+ }
+ }
+
+ operator bool()
+ {
+ return _finish;
+ }
+
+ void
+ completed()
+ {
+ //
+ // Call finishMessage once IO is completed only if serialization is not enabled.
+ // Otherwise, finishMessage will be called when the event handler is done with
+ // the message (it will be called from ~ThreadPoolMessage below).
+ //
+ assert(_finish);
+ if(_message._current.ioCompleted())
+ {
+ _finish = false;
+ _message._finish = true;
+ }
+ }
+
+ private:
+
+ ThreadPoolMessage& _message;
+ bool _finish;
+ };
+ friend class IOScope;
+
+ ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) :
+ _current(current), _mutex(mutex), _finish(false)
+ {
+ }
+
+ ~ThreadPoolMessage()
+ {
+ if(_finish)
+ {
+ //
+ // A ThreadPoolMessage instance must be created outside the synchronization
+ // of the event handler. We need to lock the event handler here to call
+ // finishMessage.
+ //
+ IceUtil::LockT<typename T> sync(_mutex);
+ _current.finishMessage();
+ }
+ }
+
+private:
+
+ ThreadPoolCurrent& _current;
+ const T& _mutex;
+ bool _finish;
+};
+#endif
- const bool _hasPriority;
- const int _priority;
};
-}
#endif