diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 1140 |
1 files changed, 1140 insertions, 0 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp new file mode 100644 index 00000000000..1f9a3342576 --- /dev/null +++ b/cpp/src/Ice/ThreadPool.cpp @@ -0,0 +1,1140 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +// +// The following is required for GetThreadIOPendingFlag +// +#if defined(_WIN32) && !defined(_WIN32_WINNT) +# define _WIN32_WINNT 0x0501 +#endif + +#include <IceUtil/DisableWarnings.h> +#include <Ice/ThreadPool.h> +#include <Ice/EventHandler.h> +#include <Ice/Network.h> +#include <Ice/LocalException.h> +#include <Ice/Instance.h> +#include <Ice/LoggerUtil.h> +#include <Ice/Protocol.h> +#include <Ice/ObjectAdapterFactory.h> +#include <Ice/Properties.h> +#include <Ice/TraceLevels.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; } +ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPoolWorkItem* p) { return p; } + +namespace +{ + +class ShutdownWorkItem : public ThreadPoolWorkItem +{ +public: + + ShutdownWorkItem(const InstancePtr& instance) : _instance(instance) + { + } + + virtual void + execute(ThreadPoolCurrent& current) + { + current.ioCompleted(); + try + { + _instance->objectAdapterFactory()->shutdown(); + } + catch(const CommunicatorDestroyedException&) + { + } + } + +private: + + const InstancePtr _instance; +}; + +class FinishedWorkItem : public ThreadPoolWorkItem +{ +public: + + FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler) + { + } + + virtual void + execute(ThreadPoolCurrent& current) + { + _handler->finished(current); + } + +private: + + const EventHandlerPtr _handler; +}; + +class JoinThreadWorkItem : public ThreadPoolWorkItem +{ +public: + + JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread) + { + } + + virtual void + execute(ThreadPoolCurrent&) + { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). + _thread->getThreadControl().join(); + } + +private: + + IceUtil::ThreadPtr _thread; +}; + +// +// Exception raised by the thread pool work queue when the thread pool +// is destroyed. +// +class ThreadPoolDestroyedException +{ +}; + +} + +IceInternal::DispatchWorkItem::DispatchWorkItem(const InstancePtr& instance) : _instance(instance) +{ +} + +void +IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) +{ + Ice::DispatcherPtr dispatcher = _instance->initializationData().dispatcher; + if(dispatcher) + { + try + { + dispatcher->dispatch(this, 0); + } + catch(const std::exception& ex) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\n" << ex; + } + } + catch(...) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\nunknown c++ exception"; + } + } + } + else + { + current.ioCompleted(); // Promote a follower. + run(); + } +} + +IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool, + const InstancePtr& instance, + Selector& selector) : + _threadPool(threadPool), + _instance(instance), + _selector(selector), + _destroyed(false) +#ifdef ICE_USE_IOCP + , _info(SocketOperationRead) +#endif +{ +#ifndef ICE_USE_IOCP + SOCKET fds[2]; + createPipe(fds); + _fdIntrRead = fds[0]; + _fdIntrWrite = fds[1]; + + _selector.initialize(this); + _selector.update(this, SocketOperationNone, SocketOperationRead); +#endif +} + +IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() +{ + assert(_destroyed); + +#ifndef ICE_USE_IOCP + try + { + closeSocket(_fdIntrRead); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } + + try + { + closeSocket(_fdIntrWrite); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } +#endif +} + +void +IceInternal::ThreadPoolWorkQueue::destroy() +{ + Lock sync(*this); + assert(!_destroyed); + _destroyed = true; + postMessage(); +} + +void +IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) +{ + Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _workItems.push_back(item); +#ifndef ICE_USE_IOCP + if(_workItems.size() == 1) + { + postMessage(); + } +#else + postMessage(); +#endif +} + +#ifdef ICE_USE_IOCP +bool +IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation) +{ + assert(false); + return false; +} + +bool +IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation) +{ + assert(false); + return false; +} +#endif + +void +IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) +{ + ThreadPoolWorkItemPtr workItem; + { + Lock sync(*this); + if(!_workItems.empty()) + { + workItem = _workItems.front(); + _workItems.pop_front(); + +#ifndef ICE_USE_IOCP + if(_workItems.empty()) + { + char c; + while(true) + { + ssize_t ret; +#ifdef _WIN32 + ret = ::recv(_fdIntrRead, &c, 1, 0); +#else + ret = ::read(_fdIntrRead, &c, 1); +#endif + if(ret == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + break; + } + } +#endif + } + else + { + assert(_destroyed); +#ifdef ICE_USE_IOCP + postMessage(); +#endif + } + } + + if(workItem) + { + workItem->execute(current); + } + else + { + current.ioCompleted(); + throw ThreadPoolDestroyedException(); + } +} + +void +IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&) +{ + assert(false); +} + +string +IceInternal::ThreadPoolWorkQueue::toString() const +{ + return "work queue"; +} + +NativeInfoPtr +IceInternal::ThreadPoolWorkQueue::getNativeInfo() +{ +#ifndef ICE_USE_IOCP + return new NativeInfo(_fdIntrRead); +#endif + return 0; +} + +void +IceInternal::ThreadPoolWorkQueue::postMessage() +{ +#ifndef ICE_USE_IOCP + char c = 0; + while(true) + { +#ifdef _WIN32 + if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) +#else + if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) +#endif + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + break; + } +#else + if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), +#if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60 + reinterpret_cast<LPOVERLAPPED>(&_info) +#else + &_info +#endif + )) + { + SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +#endif +} + +IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : + _instance(instance), + _destroyed(false), + _prefix(prefix), + _selector(instance), + _size(0), + _sizeIO(0), + _sizeMax(0), + _sizeWarn(0), + _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0), + _hasPriority(false), + _priority(0), + _serverIdleTime(timeout), + _threadIdleTime(0), + _stackSize(0), + _inUse(0), +#ifndef ICE_USE_IOCP + _inUseIO(0), + _nextHandler(_handlers.end()), +#endif + _promote(true) +{ + PropertiesPtr properties = _instance->initializationData().properties; + +#ifdef _WIN32 + SYSTEM_INFO sysInfo; + GetSystemInfo(&sysInfo); + int nProcessors = sysInfo.dwNumberOfProcessors; +#else + int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN)); +#endif + + // + // We use just one thread as the default. This is the fastest + // possible setting, still allows one level of nesting, and + // doesn't require to make the servants thread safe. + // + int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1); + if(size < 1) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".Size < 1; Size adjusted to 1"; + size = 1; + } + + int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + if(sizeMax == -1) + { + sizeMax = nProcessors; + } + if(sizeMax < size) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")"; + sizeMax = size; + } + + int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn"); + if(sizeWarn != 0 && sizeWarn < size) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".SizeWarn < " << _prefix << ".Size; adjusted SizeWarn to Size (" << size << ")"; + sizeWarn = size; + } + else if(sizeWarn > sizeMax) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")"; + sizeWarn = sizeMax; + } + + int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); + if(threadIdleTime < 0) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; + threadIdleTime = 0; + } + + const_cast<int&>(_size) = size; + const_cast<int&>(_sizeMax) = sizeMax; + const_cast<int&>(_sizeWarn) = sizeWarn; + const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors); + const_cast<int&>(_threadIdleTime) = threadIdleTime; + +#ifdef ICE_USE_IOCP + _selector.setup(_sizeIO); +#endif + + int stackSize = properties->getPropertyAsInt(_prefix + ".StackSize"); + if(stackSize < 0) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".StackSize < 0; Size adjusted to OS default"; + stackSize = 0; + } + const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize); + + const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != ""; + const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority"); + if(!_hasPriority) + { + const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != ""; + const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); + } + + _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); + + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "creating " << _prefix << ": Size = " << _size << ", SizeMax = " << _sizeMax << ", SizeWarn = " + << _sizeWarn; + } + + __setNoDelete(true); + try + { + for(int i = 0 ; i < _size ; ++i) + { + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + if(_hasPriority) + { + thread->start(_stackSize, _priority); + } + else + { + thread->start(_stackSize); + } + _threads.insert(thread); + } + } + catch(const IceUtil::Exception& ex) + { + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for `" << _prefix << "':\n" << ex; + } + + destroy(); + joinWithAllThreads(); + __setNoDelete(false); + throw; + } + catch(...) + { + __setNoDelete(false); + throw; + } + __setNoDelete(false); +} + +IceInternal::ThreadPool::~ThreadPool() +{ + assert(_destroyed); +} + +void +IceInternal::ThreadPool::destroy() +{ + Lock sync(*this); + assert(!_destroyed); + _destroyed = true; + _workQueue->destroy(); +} + +void +IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) +{ + Lock sync(*this); + assert(!_destroyed); + _selector.initialize(handler.get()); +} + +void +IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add) +{ + Lock sync(*this); + assert(!_destroyed); + _selector.update(handler.get(), remove, add); +} + +void +IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) +{ + Lock sync(*this); + assert(!_destroyed); +#ifndef ICE_USE_IOCP + _selector.finish(handler.get()); // This must be called before! + _workQueue->queue(new FinishedWorkItem(handler)); + + // + // Clear the current ready handlers. The handlers from this vector can't be + // reference counted and a handler might get destroyed once it's finished. + // + _handlers.clear(); + _nextHandler = _handlers.end(); +#else + // If there are no pending asynchronous operations, we can call finish on the handler now. + if(!handler->_pending) + { + _workQueue->queue(new FinishedWorkItem(handler)); + _selector.finish(handler.get()); + } + else + { + handler->_finish = true; + } +#endif +} + +void +IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem) +{ + _workQueue->queue(workItem); +} + +void +IceInternal::ThreadPool::joinWithAllThreads() +{ + assert(_destroyed); + + // + // _threads is immutable after destroy() has been called, + // therefore no synchronization is needed. (Synchronization + // wouldn't be possible here anyway, because otherwise the other + // threads would never terminate.) + // + for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + { + (*p)->getThreadControl().join(); + } + +#ifndef ICE_USE_IOCP + _selector.finish(_workQueue.get()); +#endif + _selector.destroy(); +} + +string +IceInternal::ThreadPool::prefix() const +{ + return _prefix; +} + +void +IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) +{ +#ifndef ICE_USE_IOCP + ThreadPoolCurrent current(_instance, this); + bool select = false; + vector<pair<EventHandler*, SocketOperation> > handlers; + while(true) + { + if(current._handler) + { + try + { + current._handler->message(current); + } + catch(ThreadPoolDestroyedException&) + { + return; + } + catch(const exception& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " + << current._handler->toString(); + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); + } + } + else if(select) + { + try + { + _selector.select(handlers, _serverIdleTime); + } + catch(SelectorTimeoutException&) + { + Lock sync(*this); + if(!_destroyed && _inUse == 0) + { + _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out. + } + continue; + } + } + + { + Lock sync(*this); + if(!current._handler) + { + if(select) + { + _handlers.swap(handlers); + _nextHandler = _handlers.begin(); + _selector.finishSelect(); + select = false; + } + else if(!current._leader && followerWait(thread, current)) + { + return; // Wait timed-out. + } + } + else if(_sizeMax > 1) + { + if(!current._ioCompleted) + { + // + // The handler didn't call ioCompleted() so we take care of decreasing + // the IO thread count now. + // + --_inUseIO; + } + else + { + // + // If the handler called ioCompleted(), we re-enable the handler in + // case it was disabled and we decrease the number of thread in use. + // + _selector.enable(current._handler.get(), current.operation); + assert(_inUse > 0); + --_inUse; + } + + if(!current._leader && followerWait(thread, current)) + { + return; // Wait timed-out. + } + } + + // + // Get the next ready handler. + // + if(_nextHandler != _handlers.end()) + { + current._ioCompleted = false; + current._handler = _nextHandler->first; + current.operation = _nextHandler->second; + ++_nextHandler; + } + else + { + current._handler = 0; + } + + if(!current._handler) + { + // + // If there are no more ready handlers and there are still threads busy performing + // IO, we give up leadership and promote another follower (which will perform the + // select() only once all the IOs are completed). Otherwise, if there are no more + // threads peforming IOs, it's time to do another select(). + // + if(_inUseIO > 0) + { + promoteFollower(current); + } + else + { + _selector.startSelect(); + select = true; + } + } + else if(_sizeMax > 1) + { + // + // Increment the IO thread count and if there are still threads available + // to perform IO and more handlers ready, we promote a follower. + // + ++_inUseIO; + if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO) + { + promoteFollower(current); + } + } + } + } +#else + ThreadPoolCurrent current(_instance, this); + while(true) + { + try + { + current._ioCompleted = false; + current._handler = _selector.getNextHandler(current.operation, _threadIdleTime); + } + catch(const SelectorTimeoutException&) + { + if(_sizeMax > 1) + { + Lock sync(*this); + + if(_destroyed) + { + continue; + } + else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit. + { + BOOL hasIO = false; + GetThreadIOPendingFlag(GetCurrentThread(), &hasIO); + if(hasIO) + { + continue; + } + + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1); + } + _threads.erase(thread); + _workQueue->queue(new JoinThreadWorkItem(thread)); + return; + } + else if(_inUse > 0) + { + // + // If this is the last idle thread but there are still other threads + // busy dispatching, we go back waiting with _threadIdleTime. We only + // wait with _serverIdleTime when there's only one thread left. + // + continue; + } + assert(_threads.size() == 1); + } + + try + { + current._handler = _selector.getNextHandler(current.operation, _serverIdleTime); + } + catch(const SelectorTimeoutException&) + { + Lock sync(*this); + if(!_destroyed) + { + _workQueue->queue(new ShutdownWorkItem(_instance)); + } + continue; + } + } + + assert(current._handler); + try + { + current._handler->message(current); + + if(_sizeMax > 1 && current._ioCompleted) + { + Lock sync(*this); + assert(_inUse > 0); + --_inUse; + } + } + catch(ThreadPoolDestroyedException&) + { + return; + } + catch(const exception& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); + } + } +#endif +} + +bool +IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) +{ + current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. + + if(_sizeMax > 1) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); +#ifndef ICE_USE_IOCP + --_inUseIO; + + if(_serialize && !_destroyed) + { + _selector.disable(current._handler.get(), current.operation); + } + + if(current._leader) + { + // + // If this thread is still the leader, it's time to promote a new leader. + // + promoteFollower(current); + } + else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0)) + { + notify(); + } +#endif + + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + Warning out(_instance->initializationData().logger); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + if(!_destroyed) + { + assert(_inUse <= static_cast<int>(_threads.size())); + if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) + { + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "growing " << _prefix << ": Size=" << _threads.size() + 1; + } + + try + { + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + if(_hasPriority) + { + thread->start(_stackSize, _priority); + } + else + { + thread->start(_stackSize); + } + _threads.insert(thread); + } + catch(const IceUtil::Exception& ex) + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for `" << _prefix << "':\n" << ex; + } + } + } + } + + return _serialize; +} + +#ifdef ICE_USE_IOCP +bool +IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) +{ + assert(current._handler->_pending & current.operation); + + if(current._handler->_started & current.operation) + { + assert(!(current._handler->_ready & current.operation)); + current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation); + current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation); + if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished. + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; + } + } + else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation)) + { + assert(!(current._handler->_started & current.operation)); + if(!current._handler->startAsync(current.operation)) + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; + } + else + { + current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation); + return false; + } + } + + if(current._handler->_registered & current.operation) + { + assert(current._handler->_ready & current.operation); + current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation); + return true; + } + else + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; + } +} + +void +IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) +{ + if(current._handler->_registered & current.operation) + { + assert(!(current._handler->_ready & current.operation)); + if(!current._handler->startAsync(current.operation)) + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + } + else + { + assert(current._handler->_pending & current.operation); + current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation); + } + } + else + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + } + + if(!current._handler->_pending && current._handler->_finish) + { + // There are no more pending async operations, it's time to call finish. + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } +} +#else +void +IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current) +{ + assert(!_promote && current._leader); + _promote = true; + if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0)) + { + notify(); + } + current._leader = false; +} + +bool +IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current) +{ + assert(!current._leader); + + // + // It's important to clear the handler before waiting to make sure that + // resources for the handler are released now if it's finished. We also + // clear the per-thread stream. + // + current._handler = 0; + current.stream.clear(); + current.stream.b.clear(); + + // + // Wait to be promoted and for all the IO threads to be done. + // + while(!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0)) + { + if(_threadIdleTime) + { + if(!timedWait(IceUtil::Time::seconds(_threadIdleTime))) + { + if(!_destroyed && (!_promote || _inUseIO == _sizeIO || + (_nextHandler == _handlers.end() && _inUseIO > 0))) + { + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1); + } + assert(_threads.size() > 1); // Can only be called by a waiting follower thread. + _threads.erase(thread); + _workQueue->queue(new JoinThreadWorkItem(thread)); + return true; + } + } + } + else + { + wait(); + } + } + current._leader = true; // The current thread has become the leader. + _promote = false; + return false; +} +#endif + +IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : + IceUtil::Thread(pool->_prefix + " thread"), + _pool(pool) +{ +} + +void +IceInternal::ThreadPool::EventHandlerThread::run() +{ + if(_pool->_instance->initializationData().threadHook) + { + try + { + _pool->_instance->initializationData().threadHook->start(); + } + catch(const exception& ex) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; + } + catch(...) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'"; + } + } + + try + { + _pool->run(this); + } + catch(const exception& ex) + { + Error out(_pool->_instance->initializationData().logger); + out << "exception in `" << _pool->_prefix << "':\n" << ex; + } + catch(...) + { + Error out(_pool->_instance->initializationData().logger); + out << "unknown exception in `" << _pool->_prefix << "'"; + } + + if(_pool->_instance->initializationData().threadHook) + { + try + { + _pool->_instance->initializationData().threadHook->stop(); + } + catch(const exception& ex) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; + } + catch(...) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'"; + } + } + + _pool = 0; // Break cyclic dependency. +} + +ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) : + operation(SocketOperationNone), + stream(instance.get()), + _threadPool(threadPool.get()), + _ioCompleted(false) +#ifndef ICE_USE_IOCP + , _leader(false) +#endif +{ +} |