diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/ThreadPool.h | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/ThreadPool.h')
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 253 |
1 files changed, 228 insertions, 25 deletions
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 567e6143468..1b52124ea1c 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -22,12 +22,17 @@ #include <Ice/PropertiesF.h> #include <Ice/EventHandlerF.h> #include <Ice/Selector.h> -#include <list> +#include <Ice/BasicStream.h> + +#include <set> namespace IceInternal { -class BasicStream; +class ThreadPoolCurrent; + +class ThreadPoolWorkQueue; +typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr; class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> { @@ -38,32 +43,42 @@ public: void destroy(); - void incFdsInUse(); - void decFdsInUse(); - - void _register(const EventHandlerPtr&); - void unregister(const EventHandlerPtr&); + void initialize(const EventHandlerPtr&); + void _register(const EventHandlerPtr& handler, SocketOperation status) + { + update(handler, SocketOperationNone, status); + } + void update(const EventHandlerPtr&, SocketOperation, SocketOperation); + void unregister(const EventHandlerPtr& handler, SocketOperation status) + { + update(handler, status, SocketOperationNone); + } void finish(const EventHandlerPtr&); void execute(const ThreadPoolWorkItemPtr&); - void promoteFollower(EventHandler* = 0); void joinWithAllThreads(); std::string prefix() const; - + private: - bool run(); // Returns true if a follower should be promoted. - bool read(const EventHandlerPtr&); + void run(const IceUtil::ThreadPtr&); - InstancePtr _instance; - bool _destroyed; - const std::string _prefix; + bool ioCompleted(ThreadPoolCurrent&); - Selector<EventHandler> _selector; +#ifdef ICE_USE_IOCP + bool startMessage(ThreadPoolCurrent&); + void finishMessage(ThreadPoolCurrent&); +#else + void promoteFollower(ThreadPoolCurrent&); + bool followerWait(const IceUtil::ThreadPtr&, ThreadPoolCurrent&); +#endif - std::list<ThreadPoolWorkItemPtr> _workItems; - std::list<EventHandlerPtr> _finished; + const InstancePtr _instance; + ThreadPoolWorkQueuePtr _workQueue; + bool _destroyed; + const std::string _prefix; + Selector _selector; class EventHandlerThread : public IceUtil::Thread { @@ -76,28 +91,216 @@ private: ThreadPoolPtr _pool; }; + friend class EventHandlerThread; + friend class ThreadPoolCurrent; const int _size; // Number of threads that are pre-created. + const int _sizeIO; // Maximum number of threads that can concurrently perform IO. const int _sizeMax; // Maximum number of threads. const int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. const bool _serialize; // True if requests need to be serialized over the connection. - + const bool _hasPriority; + const int _priority; + const int _serverIdleTime; + const int _threadIdleTime; const size_t _stackSize; - std::vector<IceUtil::ThreadPtr> _threads; // All threads, running or not. - int _running; // Number of running threads. + std::set<IceUtil::ThreadPtr> _threads; // All threads, running or not. int _inUse; // Number of threads that are currently in use. - double _load; // Current load in number of threads. +#ifndef ICE_USE_IOCP + int _inUseIO; // Number of threads that are currently performing IO. + std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; + std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; +#endif bool _promote; +}; - const bool _warnUdp; +class ThreadPoolCurrent +{ +public: + + ThreadPoolCurrent(const InstancePtr&, const ThreadPoolPtr&); + + SocketOperation operation; + BasicStream stream; // A per-thread stream to be used by event handlers for optimization. + + bool ioCompleted() const + { + return _threadPool->ioCompleted(const_cast<ThreadPoolCurrent&>(*this)); + } + +#ifdef ICE_USE_IOCP + bool startMessage() + { + return _threadPool->startMessage(const_cast<ThreadPoolCurrent&>(*this)); + } + + void finishMessage() + { + _threadPool->finishMessage(const_cast<ThreadPoolCurrent&>(*this)); + } +#endif + +private: + + ThreadPool* _threadPool; + EventHandlerPtr _handler; + bool _ioCompleted; +#ifndef ICE_USE_IOCP + bool _leader; +#endif + friend class ThreadPool; +}; + +class ThreadPoolWorkItem : virtual public IceUtil::Shared +{ +public: + + virtual void execute(ThreadPoolCurrent&) = 0; +}; + +// +// The ThreadPoolMessage class below hides the IOCP implementation details from +// the event handler implementations. Only event handler implementation that +// require IO need to use this class. +// +// An instance of the IOScope subclass must be created within the synchronization +// of the event handler. It takes care of calling startMessage/finishMessage for +// the IOCP implementation and ensures that finishMessage isn't called multiple +// times. +// +#ifndef ICE_USE_IOCP +template<class T> class ThreadPoolMessage +{ +public: + + class IOScope + { + public: + + IOScope(ThreadPoolMessage<T>& message) : _message(message) + { + // Nothing to do. + } + + ~IOScope() + { + // Nothing to do. + } + + operator bool() + { + return true; + } + + void completed() + { + _message._current.ioCompleted(); + } + + private: + + ThreadPoolMessage<T>& _message; + }; + friend class IOScope; + + ThreadPoolMessage(ThreadPoolCurrent& current, const T&) : _current(current) + { + } + + ~ThreadPoolMessage() + { + // Nothing to do. + } + +private: + + ThreadPoolCurrent& _current; +}; + +#else + +template<class T> class ThreadPoolMessage +{ +public: + + class IOScope + { + public: + + IOScope(ThreadPoolMessage& message) : _message(message) + { + // This must be called with the handler locked. + _finish = _message._current.startMessage(); + } + + ~IOScope() + { + if(_finish) + { + // This must be called with the handler locked. + _message._current.finishMessage(); + } + } + + operator bool() + { + return _finish; + } + + void + completed() + { + // + // Call finishMessage once IO is completed only if serialization is not enabled. + // Otherwise, finishMessage will be called when the event handler is done with + // the message (it will be called from ~ThreadPoolMessage below). + // + assert(_finish); + if(_message._current.ioCompleted()) + { + _finish = false; + _message._finish = true; + } + } + + private: + + ThreadPoolMessage& _message; + bool _finish; + }; + friend class IOScope; + + ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) : + _current(current), _mutex(mutex), _finish(false) + { + } + + ~ThreadPoolMessage() + { + if(_finish) + { + // + // A ThreadPoolMessage instance must be created outside the synchronization + // of the event handler. We need to lock the event handler here to call + // finishMessage. + // + IceUtil::LockT<typename T> sync(_mutex); + _current.finishMessage(); + } + } + +private: + + ThreadPoolCurrent& _current; + const T& _mutex; + bool _finish; +}; +#endif - const bool _hasPriority; - const int _priority; }; -} #endif |