diff options
author | Mark Spruiell <mes@zeroc.com> | 2014-03-19 12:45:55 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2014-03-19 12:45:55 -0700 |
commit | cdcffbcc3c3c052afdeb772ff0167e7a90b525bb (patch) | |
tree | 4f16ee41ef7d33394c44e9db81e4d6cd89908250 /cpp/src/Ice/ThreadPool.cpp | |
parent | fixing testicedist.py for 5487 (diff) | |
download | ice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.tar.bz2 ice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.tar.xz ice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.zip |
merging javascript branch
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 138 |
1 files changed, 128 insertions, 10 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 2234e8c7ceb..bdfe1a095ac 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -108,6 +108,29 @@ private: IceUtil::ThreadPtr _thread; }; +class InterruptWorkItem : public ThreadPoolWorkItem +{ +public: + + virtual void + execute(ThreadPoolCurrent& current) + { + // Nothing to do, this is just used to interrupt the thread pool selector. + } +}; +ThreadPoolWorkItemPtr interruptWorkItem; + +class InterruptWorkItemInit +{ +public: + + InterruptWorkItemInit() + { + interruptWorkItem = new InterruptWorkItem; + } +}; +InterruptWorkItemInit init; + // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -561,7 +584,33 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation { Lock sync(*this); assert(!_destroyed); + + // Don't remove what needs to be added + remove = static_cast<SocketOperation>(remove & ~add); + + // Don't remove/add if already un-registered or registered + remove = static_cast<SocketOperation>(handler->_registered & remove); + add = static_cast<SocketOperation>(~handler->_registered & add); + if(remove == add) + { + return; + } + _selector.update(handler.get(), remove, add); +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) + if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead)) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(handler.get()); + } + else if(remove & SocketOperationRead) + { + _pendingHandlers.erase(handler.get()); + } +#endif } void @@ -577,8 +626,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) // Clear the current ready handlers. The handlers from this vector can't be // reference counted and a handler might get destroyed once it's finished. // - _handlers.clear(); - _nextHandler = _handlers.end(); + //_handlers.clear(); + //_nextHandler = _handlers.end(); #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) @@ -682,6 +731,19 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(select) { _handlers.swap(handlers); + if(!_pendingHandlers.empty()) + { + for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) + { + _pendingHandlers.erase(_nextHandler->first); + } + set<EventHandler*>::const_iterator p; + for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) + { + _handlers.push_back(make_pair(*p, SocketOperationRead)); + } + _pendingHandlers.clear(); + } _nextHandler = _handlers.begin(); _selector.finishSelect(); select = false; @@ -700,6 +762,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } } else { @@ -707,7 +777,18 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - _selector.enable(current._handler.get(), current.operation); + if(_serialize) + { + _selector.enable(current._handler.get(), current.operation); + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } assert(_inUse > 0); --_inUse; } @@ -717,10 +798,22 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } // // Get the next ready handler. // + while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) + { + ++_nextHandler; + } if(_nextHandler != _handlers.end()) { current._ioCompleted = false; @@ -748,6 +841,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } else { + _handlers.clear(); _selector.startSelect(); select = true; thread->setState(ThreadStateIdle); @@ -774,7 +868,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) try { current._ioCompleted = false; - current._handler = _selector.getNextHandler(current.operation, _threadIdleTime); + current._handler = _selector.getNextHandler(current.operation, current._count, current._error, + _threadIdleTime); } catch(const SelectorTimeoutException&) { @@ -819,7 +914,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) try { - current._handler = _selector.getNextHandler(current.operation, _serverIdleTime); + current._handler = _selector.getNextHandler(current.operation, current._count, current._error, + _serverIdleTime); } catch(const SelectorTimeoutException&) { @@ -889,13 +985,30 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { + #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; - - if(_serialize && !_destroyed) + + if(!_destroyed) { - _selector.disable(current._handler.get(), current.operation); - } + if(_serialize) + { + _selector.disable(current._handler.get(), current.operation); + + // Make sure the handler isn't in the set of pending handlers (this can + // for example occur if the handler is has more data and its added by + // ThreadPool::update while we were processing IO). + _pendingHandlers.erase(current._handler.get()); + } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } if(current._leader) { @@ -967,6 +1080,11 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) assert(!(current._handler->_ready & current.operation)); current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation); current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation); + + AsyncInfo* info = current._handler->getNativeInfo()->getAsyncInfo(current.operation); + info->count = current._count; + info->error = current._error; + if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished. { current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); |