diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.h')
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 66 |
1 files changed, 31 insertions, 35 deletions
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 0fc1bcd2617..9cccedca63e 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -23,7 +23,7 @@ #include <Ice/PropertiesF.h> #include <Ice/EventHandler.h> #include <Ice/Selector.h> -#include <Ice/BasicStream.h> +#include <Ice/InputStream.h> #include <Ice/ObserverHelper.h> #include <set> @@ -35,12 +35,12 @@ namespace IceInternal class ThreadPoolCurrent; class ThreadPoolWorkQueue; -typedef IceUtil::Handle<ThreadPoolWorkQueue> ThreadPoolWorkQueuePtr; +ICE_DEFINE_PTR(ThreadPoolWorkQueuePtr, ThreadPoolWorkQueue); -class ThreadPoolWorkItem : virtual public IceUtil::Shared +class ThreadPoolWorkItem : public virtual IceUtil::Shared { public: - + virtual void execute(ThreadPoolCurrent&) = 0; }; typedef IceUtil::Handle<ThreadPoolWorkItem> ThreadPoolWorkItemPtr; @@ -51,8 +51,8 @@ public: DispatchWorkItem(); DispatchWorkItem(const Ice::ConnectionPtr& connection); - - const Ice::ConnectionPtr& + + const Ice::ConnectionPtr& getConnection() { return _connection; @@ -66,18 +66,18 @@ private: }; typedef IceUtil::Handle<DispatchWorkItem> DispatchWorkItemPtr; -class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> +class ThreadPool : public IceUtil::Shared, private IceUtil::Monitor<IceUtil::Mutex> { class EventHandlerThread : public IceUtil::Thread { public: - + EventHandlerThread(const ThreadPoolPtr&, const std::string&); virtual void run(); void updateObserver(); void setState(Ice::Instrumentation::ThreadState); - + private: ThreadPoolPtr _pool; @@ -106,6 +106,7 @@ public: update(handler, status, SocketOperationNone); } bool finish(const EventHandlerPtr&, bool); + void ready(const EventHandlerPtr&, SocketOperation, bool); void dispatchFromThisThread(const DispatchWorkItemPtr&); void dispatch(const DispatchWorkItemPtr&); @@ -131,7 +132,11 @@ private: std::string nextThreadId(); const InstancePtr _instance; +#ifdef ICE_CPP11_MAPPING + std::function<void (std::function<void ()>, const std::shared_ptr<Ice::Connection>&)> _dispatcher; +#else const Ice::DispatcherPtr _dispatcher; +#endif ThreadPoolWorkQueuePtr _workQueue; bool _destroyed; const std::string _prefix; @@ -140,6 +145,7 @@ private: friend class EventHandlerThread; friend class ThreadPoolCurrent; + friend class ThreadPoolWorkQueue; const int _size; // Number of threads that are pre-created. const int _sizeIO; // Maximum number of threads that can concurrently perform IO. @@ -158,7 +164,6 @@ private: int _inUseIO; // Number of threads that are currently performing IO. std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; - std::set<EventHandler*> _pendingHandlers; #endif bool _promote; @@ -171,7 +176,7 @@ public: ThreadPoolCurrent(const InstancePtr&, const ThreadPoolPtr&, const ThreadPool::EventHandlerThreadPtr&); SocketOperation operation; - BasicStream stream; // A per-thread stream to be used by event handlers for optimization. + Ice::InputStream stream; // A per-thread stream to be used by event handlers for optimization. bool ioCompleted() const { @@ -215,12 +220,11 @@ private: friend class ThreadPool; }; -class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex +class ThreadPoolWorkQueue : public EventHandler { public: - ThreadPoolWorkQueue(const InstancePtr&, Selector&); - ~ThreadPoolWorkQueue(); + ThreadPoolWorkQueue(ThreadPool&); void destroy(); void queue(const ThreadPoolWorkItemPtr&); @@ -234,19 +238,11 @@ public: virtual void finished(ThreadPoolCurrent&, bool); virtual std::string toString() const; virtual NativeInfoPtr getNativeInfo(); - virtual void postMessage(); private: - const InstancePtr _instance; - Selector& _selector; + ThreadPool& _threadPool; bool _destroyed; -#ifdef ICE_USE_IOCP - AsyncInfo _info; -#elif !defined(ICE_OS_WINRT) - SOCKET _fdIntrRead; - SOCKET _fdIntrWrite; -#endif std::list<ThreadPoolWorkItemPtr> _workItems; }; @@ -257,7 +253,7 @@ private: // // An instance of the IOScope subclass must be created within the synchronization // of the event handler. It takes care of calling startMessage/finishMessage for -// the IOCP implementation and ensures that finishMessage isn't called multiple +// the IOCP implementation and ensures that finishMessage isn't called multiple // times. // #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) @@ -290,7 +286,7 @@ public: } private: - + ThreadPoolMessage<T>& _message; }; friend class IOScope; @@ -309,7 +305,7 @@ private: ThreadPoolCurrent& _current; }; -#else +#else template<class T> class ThreadPoolMessage { @@ -321,7 +317,7 @@ public: IOScope(ThreadPoolMessage& message) : _message(message) { - // This must be called with the handler locked. + // This must be called with the handler locked. _finish = _message._current.startMessage(); } @@ -329,7 +325,7 @@ public: { if(_finish) { - // This must be called with the handler locked. + // This must be called with the handler locked. _message._current.finishMessage(); } } @@ -344,7 +340,7 @@ public: { // // Call finishMessage once IO is completed only if serialization is not enabled. - // Otherwise, finishMessage will be called when the event handler is done with + // Otherwise, finishMessage will be called when the event handler is done with // the message (it will be called from ~ThreadPoolMessage below). // assert(_finish); @@ -358,22 +354,22 @@ public: private: ThreadPoolMessage& _message; - bool _finish; + bool _finish; }; friend class IOScope; - - ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) : + + ThreadPoolMessage(ThreadPoolCurrent& current, const T& mutex) : _current(current), _mutex(mutex), _finish(false) { } - + ~ThreadPoolMessage() { if(_finish) { // // A ThreadPoolMessage instance must be created outside the synchronization - // of the event handler. We need to lock the event handler here to call + // of the event handler. We need to lock the event handler here to call // finishMessage. // #if defined(__MINGW32__) @@ -386,7 +382,7 @@ public: } private: - + ThreadPoolCurrent& _current; const T& _mutex; bool _finish; |