diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-10-09 15:00:57 +0200 |
commit | 20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch) | |
tree | 1b389964fa35ca9de23c548120ecedcc9d82074c /cpp/src/Ice/ThreadPool.cpp | |
parent | Merge branch '3.6' (diff) | |
download | ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2 ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip |
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 322 |
1 files changed, 89 insertions, 233 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 7ff3f57db4b..0de8b971559 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -99,29 +99,6 @@ private: IceUtil::ThreadPtr _thread; }; -class InterruptWorkItem : public ThreadPoolWorkItem -{ -public: - - virtual void - execute(ThreadPoolCurrent& current) - { - // Nothing to do, this is just used to interrupt the thread pool selector. - } -}; -ThreadPoolWorkItemPtr interruptWorkItem; - -class InterruptWorkItemInit -{ -public: - - InterruptWorkItemInit() - { - interruptWorkItem = new InterruptWorkItem; - } -}; -InterruptWorkItemInit init; - // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -147,77 +124,38 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) current.dispatchFromThisThread(this); } -IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instance, Selector& selector) : - _instance(instance), - _selector(selector), +IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) : + _threadPool(threadPool), _destroyed(false) -#ifdef ICE_USE_IOCP - , _info(SocketOperationRead) -#endif { -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - SOCKET fds[2]; - createPipe(fds); - _fdIntrRead = fds[0]; - _fdIntrWrite = fds[1]; - - _selector.initialize(this); - _selector.update(this, SocketOperationNone, SocketOperationRead); -#endif -} - -IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() -{ - assert(_destroyed); - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - try - { - closeSocket(_fdIntrRead); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } - - try - { - closeSocket(_fdIntrWrite); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } -#endif + _registered = SocketOperationRead; } void IceInternal::ThreadPoolWorkQueue::destroy() { - Lock sync(*this); + //Lock sync(*this); Called with the thread pool locked assert(!_destroyed); _destroyed = true; - postMessage(); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + _threadPool._selector.completed(this, SocketOperationRead); +#else + _threadPool._selector.ready(this, SocketOperationRead, true); +#endif } void IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) { - Lock sync(*this); - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } + //Lock sync(*this); Called with the thread pool locked _workItems.push_back(item); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + _threadPool._selector.completed(this, SocketOperationRead); +#else if(_workItems.size() == 1) { - postMessage(); + _threadPool._selector.ready(this, SocketOperationRead, true); } -#else - postMessage(); #endif } @@ -242,47 +180,24 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) { ThreadPoolWorkItemPtr workItem; { - Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool); if(!_workItems.empty()) { workItem = _workItems.front(); _workItems.pop_front(); - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(_workItems.empty()) - { - char c; - while(true) - { - ssize_t ret; -# ifdef _WIN32 - ret = ::recv(_fdIntrRead, &c, 1, 0); -# else - ret = ::read(_fdIntrRead, &c, 1); -# endif - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - break; - } - } -#endif } +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) else { assert(_destroyed); -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - postMessage(); -#endif + _threadPool._selector.completed(this, SocketOperationRead); } +#else + if(_workItems.empty() && !_destroyed) + { + _threadPool._selector.ready(this, SocketOperationRead, false); + } +#endif } if(workItem) @@ -291,6 +206,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) } else { + assert(_destroyed); current.ioCompleted(); throw ThreadPoolDestroyedException(); } @@ -311,47 +227,7 @@ IceInternal::ThreadPoolWorkQueue::toString() const NativeInfoPtr IceInternal::ThreadPoolWorkQueue::getNativeInfo() { -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - return new NativeInfo(_fdIntrRead); -#else return 0; -#endif -} - -void -IceInternal::ThreadPoolWorkQueue::postMessage() -{ -#if defined(ICE_USE_IOCP) - if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info)) - { - SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } -#elif defined(ICE_OS_WINRT) - _selector.completed(this, SocketOperationRead); -#else - char c = 0; - while(true) - { -# ifdef _WIN32 - if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) -# else - if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) -# endif - { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - break; - } -#endif } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : @@ -469,7 +345,8 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } - _workQueue = new ThreadPoolWorkQueue(_instance, _selector); + _workQueue = new ThreadPoolWorkQueue(*this); + _selector.initialize(_workQueue.get()); if(_instance->traceLevels()->threadPool >= 1) { @@ -528,7 +405,6 @@ IceInternal::ThreadPool::destroy() { return; } - _destroyed = true; _workQueue->destroy(); } @@ -549,6 +425,28 @@ IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) Lock sync(*this); assert(!_destroyed); _selector.initialize(handler.get()); + + class ReadyCallbackI : public ReadyCallback + { + public: + + ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) : + _threadPool(threadPool), _handler(handler) + { + } + + virtual void + ready(SocketOperation op, bool value) + { + _threadPool->ready(_handler, op, value); + } + + private: + + const ThreadPoolPtr _threadPool; + const EventHandlerPtr _handler; + }; + handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler)); } void @@ -569,20 +467,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation } _selector.update(handler.get(), remove, add); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead)) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(handler.get()); - } - else if(remove & SocketOperationRead) - { - _pendingHandlers.erase(handler.get()); - } -#endif } bool @@ -592,7 +476,6 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) assert(!_destroyed); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) closeNow = _selector.finish(handler.get(), closeNow); // This must be called before! - _pendingHandlers.erase(handler.get()); _workQueue->queue(new FinishedWorkItem(handler, !closeNow)); return closeNow; #else @@ -611,6 +494,17 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) } void +IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + _selector.ready(handler.get(), op, value); +} + +void IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem) { if(_dispatcher) @@ -645,6 +539,11 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI void IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem) { + Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } _workQueue->queue(workItem); } @@ -663,10 +562,6 @@ IceInternal::ThreadPool::joinWithAllThreads() { (*p)->getThreadControl().join(); } - -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - _selector.finish(_workQueue.get(), true); -#endif _selector.destroy(); } @@ -682,7 +577,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) ThreadPoolCurrent current(_instance, this, thread); bool select = false; - vector<pair<EventHandler*, SocketOperation> > handlers; while(true) { if(current._handler) @@ -711,7 +605,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { try { - _selector.select(handlers, _serverIdleTime); + _selector.select(_serverIdleTime); } catch(const SelectorTimeoutException&) { @@ -730,22 +624,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { if(select) { - _handlers.swap(handlers); - if(!_pendingHandlers.empty()) - { - for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) - { - _pendingHandlers.erase(_nextHandler->first); - } - set<EventHandler*>::const_iterator p; - for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) - { - _handlers.push_back(make_pair(*p, SocketOperationRead)); - } - _pendingHandlers.clear(); - } + _selector.finishSelect(_handlers); _nextHandler = _handlers.begin(); - _selector.finishSelect(); select = false; } else if(!current._leader && followerWait(current)) @@ -762,14 +642,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } } else { @@ -780,14 +652,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(_serialize) { _selector.enable(current._handler.get(), current.operation); - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } } assert(_inUse > 0); --_inUse; @@ -798,19 +662,12 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } // // Get the next ready handler. // - while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) + while(_nextHandler != _handlers.end() && + !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered)) { ++_nextHandler; } @@ -1007,19 +864,6 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_serialize) { _selector.disable(current._handler.get(), current.operation); - - // Make sure the handler isn't in the set of pending handlers (this can - // for example occur if the handler is has more data and its added by - // ThreadPool::update while we were processing IO). - _pendingHandlers.erase(current._handler.get()); - } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); } } @@ -1090,8 +934,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(current._handler->_started & current.operation) { - assert(!(current._handler->_ready & current.operation)); - current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation); + assert(!(current._handler->_completed & current.operation)); + current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation); current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation); #ifndef ICE_OS_WINRT @@ -1105,20 +949,26 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } return false; } } - else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation)) + else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation)) { assert(!(current._handler->_started & current.operation)); - if(!current._handler->startAsync(current.operation)) + if(current._handler->_ready & current.operation) + { + return true; + } + else if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } @@ -1133,8 +983,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) if(current._handler->_registered & current.operation) { - assert(current._handler->_ready & current.operation); - current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation); + assert(current._handler->_completed & current.operation); + current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation); return true; } else @@ -1142,6 +992,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } @@ -1152,10 +1003,14 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) void IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) { - if(current._handler->_registered & current.operation) + if(current._handler->_registered & current.operation && !current._handler->_finish) { - assert(!(current._handler->_ready & current.operation)); - if(!current._handler->startAsync(current.operation)) + assert(!(current._handler->_completed & current.operation)); + if(current._handler->_ready & current.operation) + { + _selector.completed(current._handler.get(), current.operation); + } + else if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); } @@ -1173,6 +1028,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish) { // There are no more pending async operations, it's time to call finish. + Lock sync(*this); _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } |