diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/ThreadPool.cpp | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 1364 |
1 files changed, 785 insertions, 579 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 835b28b3c08..d74b0222ddc 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -7,6 +7,13 @@ // // ********************************************************************** +// +// The following is required for GetThreadIOPendingFlag +// +#if defined(_WIN32) && !defined(_WIN32_WINNT) +# define _WIN32_WINNT 0x0501 +#endif + #include <IceUtil/DisableWarnings.h> #include <Ice/ThreadPool.h> #include <Ice/EventHandler.h> @@ -14,7 +21,6 @@ #include <Ice/LocalException.h> #include <Ice/Instance.h> #include <Ice/LoggerUtil.h> -#include <Ice/Functional.h> #include <Ice/Protocol.h> #include <Ice/ObjectAdapterFactory.h> #include <Ice/Properties.h> @@ -25,47 +31,358 @@ using namespace Ice; using namespace IceInternal; ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; } +ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPoolWorkItem* p) { return p; } + +namespace +{ + +class ShutdownWorkItem : public ThreadPoolWorkItem +{ +public: + + ShutdownWorkItem(const InstancePtr& instance) : _instance(instance) + { + } + + virtual void + execute(ThreadPoolCurrent& current) + { + current.ioCompleted(); + try + { + _instance->objectAdapterFactory()->shutdown(); + } + catch(const CommunicatorDestroyedException&) + { + } + } + +private: + + const InstancePtr _instance; +}; + +class FinishedWorkItem : public ThreadPoolWorkItem +{ +public: + + FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler) + { + } + + virtual void + execute(ThreadPoolCurrent& current) + { + _handler->finished(current); + } + +private: + + const EventHandlerPtr _handler; +}; + +class JoinThreadWorkItem : public ThreadPoolWorkItem +{ +public: + + JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread) + { + } + + virtual void + execute(ThreadPoolCurrent&) + { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). + _thread->getThreadControl().join(); + } + +private: + + IceUtil::ThreadPtr _thread; +}; + +// +// Exception raised by the thread pool work queue when the thread pool +// is destroyed. +// +class ThreadPoolDestroyedException +{ +}; + +} + +namespace IceInternal +{ + +class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex +{ +public: + + ThreadPoolWorkQueue(ThreadPool* threadPool, const InstancePtr& instance, Selector& selector) : + _threadPool(threadPool), + _instance(instance), + _selector(selector), + _destroyed(false) +#ifdef ICE_USE_IOCP + , _info(SocketOperationRead) +#endif + { +#ifndef ICE_USE_IOCP + SOCKET fds[2]; + createPipe(fds); + _fdIntrRead = fds[0]; + _fdIntrWrite = fds[1]; + + _selector.initialize(this); + _selector.update(this, SocketOperationNone, SocketOperationRead); +#endif + } + + ~ThreadPoolWorkQueue() + { + assert(_destroyed); + +#ifndef ICE_USE_IOCP + try + { + closeSocket(_fdIntrRead); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } + + try + { + closeSocket(_fdIntrWrite); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } +#endif + } + + void destroy() + { + Lock sync(*this); + assert(!_destroyed); + _destroyed = true; + postMessage(); + } + + void queue(const ThreadPoolWorkItemPtr& item) + { + Lock sync(*this); + if(_destroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _workItems.push_back(item); + if(_workItems.size() == 1) + { + postMessage(); + } + } + +#ifdef ICE_USE_IOCP + bool startAsync(SocketOperation) + { + assert(false); + return false; + } + + bool finishAsync(SocketOperation) + { + assert(false); + return false; + } +#endif + + virtual void message(ThreadPoolCurrent& current) + { + ThreadPoolWorkItemPtr workItem; + { + Lock sync(*this); + if(!_workItems.empty()) + { + workItem = _workItems.front(); + _workItems.pop_front(); + + if(_workItems.empty()) + { +#ifndef ICE_USE_IOCP + char c; + while(true) + { + ssize_t ret; +#ifdef _WIN32 + ret = ::recv(_fdIntrRead, &c, 1, 0); +#else + ret = ::read(_fdIntrRead, &c, 1); +#endif + if(ret == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + break; + } +#endif + } + } + else + { + assert(_destroyed); + } + } + + if(workItem) + { + workItem->execute(current); + } + else + { + current.ioCompleted(); + throw ThreadPoolDestroyedException(); + } + } + + virtual void finished(ThreadPoolCurrent&) + { + assert(false); + } + + virtual string toString() const + { + return "work queue"; + } + + virtual NativeInfoPtr getNativeInfo() + { +#ifndef ICE_USE_IOCP + return new NativeInfo(_fdIntrRead); +#endif + return 0; + } + + virtual void postMessage() + { +#ifndef ICE_USE_IOCP + char c = 0; + while(true) + { +#ifdef _WIN32 + if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) +#else + if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) +#endif + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + break; + } +#else + if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info)) + { + SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +#endif + } + +private: + + const ThreadPool* _threadPool; + const InstancePtr _instance; + Selector& _selector; + bool _destroyed; +#ifdef ICE_USE_IOCP + AsyncInfo _info; +#else + SOCKET _fdIntrRead; + SOCKET _fdIntrWrite; +#endif + list<ThreadPoolWorkItemPtr> _workItems; +}; + +} IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : _instance(instance), _destroyed(false), _prefix(prefix), - _selector(instance, timeout), + _selector(instance), _size(0), + _sizeIO(0), _sizeMax(0), _sizeWarn(0), _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0), + _hasPriority(false), + _priority(0), + _serverIdleTime(timeout), + _threadIdleTime(0), _stackSize(0), - _running(0), _inUse(0), - _load(1.0), - _promote(true), - _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0), - _hasPriority(false), - _priority(0) +#ifndef ICE_USE_IOCP + _inUseIO(0), + _nextHandler(_handlers.end()), +#endif + _promote(true) { + PropertiesPtr properties = _instance->initializationData().properties; + +#ifdef _WIN32 + SYSTEM_INFO sysInfo; + GetSystemInfo(&sysInfo); + int nProcessors = sysInfo.dwNumberOfProcessors; +#else + int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN)); +#endif + // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and // doesn't require to make the servants thread safe. // - int size = _instance->initializationData().properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1); + int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1); if(size < 1) { Warning out(_instance->initializationData().logger); out << _prefix << ".Size < 1; Size adjusted to 1"; size = 1; } - - int sizeMax = _instance->initializationData().properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + + int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + if(sizeMax == -1) + { + sizeMax = nProcessors; + } if(sizeMax < size) { Warning out(_instance->initializationData().logger); out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")"; sizeMax = size; - } + } - int sizeWarn = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".SizeWarn"); + int sizeWarn = properties->getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100); if(sizeWarn != 0 && sizeWarn < size) { Warning out(_instance->initializationData().logger); @@ -79,11 +396,25 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p sizeWarn = sizeMax; } + int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); + if(threadIdleTime < 0) + { + Warning out(_instance->initializationData().logger); + out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; + threadIdleTime = 0; + } + const_cast<int&>(_size) = size; const_cast<int&>(_sizeMax) = sizeMax; const_cast<int&>(_sizeWarn) = sizeWarn; + const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors); + const_cast<int&>(_threadIdleTime) = threadIdleTime; - int stackSize = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".StackSize"); +#ifdef ICE_USE_IOCP + _selector.setup(_sizeIO); +#endif + + int stackSize = properties->getPropertyAsInt(_prefix + ".StackSize"); if(stackSize < 0) { Warning out(_instance->initializationData().logger); @@ -92,15 +423,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p } const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize); - - const_cast<bool&>(_hasPriority) = _instance->initializationData().properties->getProperty(_prefix + ".ThreadPriority") != ""; - const_cast<int&>(_priority) = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".ThreadPriority"); + const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != ""; + const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority"); if(!_hasPriority) { - const_cast<bool&>(_hasPriority) = _instance->initializationData().properties->getProperty("Ice.ThreadPriority") != ""; - const_cast<int&>(_priority) = _instance->initializationData().properties->getPropertyAsInt("Ice.ThreadPriority"); + const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != ""; + const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } + _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); + if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); @@ -122,8 +454,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { thread->start(_stackSize); } - _threads.push_back(thread); - ++_running; + _threads.insert(thread); } } catch(const IceUtil::Exception& ex) @@ -131,9 +462,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { Error out(_instance->initializationData().logger); out << "cannot create thread for `" << _prefix << "':\n" << ex; -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif } destroy(); @@ -157,171 +485,83 @@ IceInternal::ThreadPool::~ThreadPool() void IceInternal::ThreadPool::destroy() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); assert(!_destroyed); _destroyed = true; - _selector.setInterrupt(); + _workQueue->destroy(); } void -IceInternal::ThreadPool::incFdsInUse() +IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) { -#ifdef ICE_USE_SELECT - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _selector.incFdsInUse(); -#endif + Lock sync(*this); + assert(!_destroyed); + _selector.initialize(handler.get()); } void -IceInternal::ThreadPool::decFdsInUse() +IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add) { -#ifdef ICE_USE_SELECT - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _selector.decFdsInUse(); -#endif + Lock sync(*this); + assert(!_destroyed); + _selector.update(handler.get(), remove, add); } void -IceInternal::ThreadPool::_register(const EventHandlerPtr& handler) +IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed && handler->_fd != INVALID_SOCKET); - if(!handler->_registered) - { - if(!handler->_serializing) - { - _selector.add(handler.get(), NeedRead); - } - handler->_registered = true; - } -} + Lock sync(*this); + assert(!_destroyed); +#ifndef ICE_USE_IOCP + _selector.finish(handler.get()); // This must be called before! + _workQueue->queue(new FinishedWorkItem(handler)); -void -IceInternal::ThreadPool::unregister(const EventHandlerPtr& handler) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed && handler->_fd != INVALID_SOCKET); - if(handler->_registered) + // + // Clear the current ready handlers. The handlers from this vector can't be + // reference counted and a handler might get destroyed once it's finished. + // + _handlers.clear(); + _nextHandler = _handlers.end(); +#else + // If there are no pending asynchronous operations, we can call finish on the handler now. + if(!handler->_pending) { - if(!handler->_serializing) - { - _selector.remove(handler.get(), NeedRead); - } - handler->_registered = false; + _workQueue->queue(new FinishedWorkItem(handler)); + _selector.finish(handler.get()); } -} - -void -IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed && handler->_fd != INVALID_SOCKET); - if(handler->_registered) + else { - if(!handler->_serializing) - { - _selector.remove(handler.get(), NeedRead); - } - handler->_registered = false; + handler->_finish = true; } - _finished.push_back(handler); - _selector.setInterrupt(); +#endif } void IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_destroyed) - { - throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); - } - _workItems.push_back(workItem); - _selector.setInterrupt(); -} - -void -IceInternal::ThreadPool::promoteFollower(EventHandler* handler) -{ - if(_sizeMax > 1) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_serialize && handler) - { - handler->_serializing = true; - if(handler->_registered) - { - _selector.remove(handler, NeedRead, true); // No interrupt, no thread is blocked on select(). - } - } - - assert(!_promote); - _promote = true; - notify(); - - if(!_destroyed) - { - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) - { - Warning out(_instance->initializationData().logger); - out << "thread pool `" << _prefix << "' is running low on threads\n" - << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; - } - - assert(_inUse <= _running); - if(_inUse < _sizeMax && _inUse == _running) - { - if(_instance->traceLevels()->threadPool >= 1) - { - Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); - out << "growing " << _prefix << ": Size = " << (_running + 1); - } - - try - { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); - if(_hasPriority) - { - thread->start(_stackSize, _priority); - } - else - { - thread->start(_stackSize); - } - _threads.push_back(thread); - ++_running; - } - catch(const IceUtil::Exception& ex) - { - Error out(_instance->initializationData().logger); - out << "cannot create thread for `" << _prefix << "':\n" << ex; -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif - } - } - } - } + _workQueue->queue(workItem); } void IceInternal::ThreadPool::joinWithAllThreads() { + assert(_destroyed); + // // _threads is immutable after destroy() has been called, // therefore no synchronization is needed. (Synchronization // wouldn't be possible here anyway, because otherwise the other // threads would never terminate.) // - assert(_destroyed); - for(vector<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) { (*p)->getThreadControl().join(); } + +#ifndef ICE_USE_IOCP + _selector.finish(_workQueue.get()); +#endif + _selector.destroy(); } string @@ -330,505 +570,461 @@ IceInternal::ThreadPool::prefix() const return _prefix; } -bool -IceInternal::ThreadPool::run() +void +IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) { - ThreadPoolPtr self = this; - - if(_sizeMax > 1) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(!_promote) - { - wait(); - } - - _promote = false; - } - +#ifndef ICE_USE_IOCP + ThreadPoolCurrent current(_instance, this); + bool select = false; + vector<pair<EventHandler*, SocketOperation> > handlers; while(true) { - int ret; - try + if(current._handler) { - ret = _selector.select(); + try + { + current._handler->message(current); + } + catch(ThreadPoolDestroyedException&) + { + return; + } + catch(const exception& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " + << current._handler->toString(); + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); + } } - catch(const Ice::LocalException& ex) + else if(select) { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "':\n" << ex; -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif - continue; + try + { + _selector.select(handlers, _serverIdleTime); + } + catch(SelectorTimeoutException&) + { + Lock sync(*this); + if(!_destroyed && _inUse == 0) + { + _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out. + } + continue; + } } - EventHandlerPtr handler; - ThreadPoolWorkItemPtr workItem; - bool finished = false; - bool shutdown = false; - - if(ret == 0) // We initiate a shutdown if there is a thread pool timeout. { - shutdown = true; - } - else - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_selector.isInterrupted()) + Lock sync(*this); + if(!current._handler) { - if(_selector.processInterrupt()) + if(select) { - continue; + _handlers.swap(handlers); + _nextHandler = _handlers.begin(); + _selector.finishSelect(); + select = false; } - - // - // There are three possiblities for an interrupt: - // - // 1. An event handler is being finished (closed). - // - // 2. The thread pool has been destroyed. - // - // 3. A work item has been schedulded. - // - if(!_finished.empty()) + else if(!current._leader && followerWait(thread, current)) { - _selector.clearInterrupt(); - handler = _finished.front(); - _finished.pop_front(); - finished = true; + return; // Wait timed-out. } - else if(!_workItems.empty()) + } + else if(_sizeMax > 1) + { + if(!current._ioCompleted) { // - // Work items must be executed first even if the thread pool is destroyed. + // The handler didn't call ioCompleted() so we take care of decreasing + // the IO thread count now. // - _selector.clearInterrupt(); - workItem = _workItems.front(); - _workItems.pop_front(); + --_inUseIO; } - else if(_destroyed) + else { // - // Don't clear the interrupt if destroyed, so that the other threads exit as well. + // If the handler called ioCompleted(), we re-enable the handler in + // case it was disabled and we decrease the number of thread in use. // - return true; + _selector.enable(current._handler.get(), current.operation); + assert(_inUse > 0); + --_inUse; } - else - { - assert(false); - } - } - else - { - handler = _selector.getNextSelected(); - if(!handler) + + if(!current._leader && followerWait(thread, current)) { - continue; + return; // Wait timed-out. } } - } - - // - // Now we are outside the thread synchronization. - // - if(shutdown) - { - // - // Initiate server shutdown. - // - ObjectAdapterFactoryPtr factory; - try - { - factory = _instance->objectAdapterFactory(); - } - catch(const Ice::CommunicatorDestroyedException&) - { - continue; - } - - promoteFollower(); - factory->shutdown(); // - // No "continue", because we want shutdown to be done in - // its own thread from this pool. Therefore we called - // promoteFollower(). + // Get the next ready handler. // - } - else if(workItem) - { - try + if(_nextHandler != _handlers.end()) { - // - // "self" is faster than "this", as the reference - // count is not modified. - // - workItem->execute(self); + current._ioCompleted = false; + current._handler = _nextHandler->first; + current.operation = _nextHandler->second; + ++_nextHandler; } - catch(const LocalException& ex) + else { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling execute():\n" << ex; -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif + current._handler = 0; } - - // - // No "continue", because we want execute() to be - // called in its own thread from this pool. Note that - // this means that execute() must call - // promoteFollower(). - // - } - else - { - assert(handler); - - if(finished) + + if(!current._handler) { // - // Notify a handler about it's removal from the thread pool. + // If there are no more ready handlers and there are still threads busy performing + // IO, we give up leadership and promote another follower (which will perform the + // select() only once all the IOs are completed). Otherwise, if there are no more + // threads peforming IOs, it's time to do another select(). // - try + if(_inUseIO > 0) { - // - // "self" is faster than "this", as the reference count is not modified. - // - handler->finished(self); + promoteFollower(current); } - catch(const LocalException& ex) + else { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling finished():\n" - << ex << '\n' << handler->toString(); -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif + _selector.startSelect(); + select = true; } - - // - // No "continue", because we want finished() to be - // called in its own thread from this pool. Note that - // this means that finished() must call - // promoteFollower(). - // } - else + else if(_sizeMax > 1) { // - // If the handler is "readable", try to read a message. + // Increment the IO thread count and if there are still threads available + // to perform IO and more handlers ready, we promote a follower. // - BasicStream stream(_instance.get()); - if(handler->readable()) + ++_inUseIO; + if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO) { - try - { - if(!read(handler)) - { - continue; // Can't read without blocking. - } - } - catch(const TimeoutException&) - { - assert(false); // This shouldn't occur as we only perform non-blocking reads. - continue; - } - catch(const DatagramLimitException&) // Expected. - { - handler->_stream.resize(0); - handler->_stream.i = stream.b.begin(); - continue; - } - catch(const SocketException& ex) + promoteFollower(current); + } + } + } + } +#else + ThreadPoolCurrent current(_instance, this); + while(true) + { + try + { + current._ioCompleted = false; + current._handler = _selector.getNextHandler(current.operation, _threadIdleTime); + } + catch(const SelectorTimeoutException&) + { + if(_sizeMax > 1) + { + Lock sync(*this); + + if(_destroyed) + { + continue; + } + else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit. + { + BOOL hasIO = false; + GetThreadIOPendingFlag(GetCurrentThread(), &hasIO); + if(hasIO) { - handler->exception(ex); continue; } - catch(const LocalException& ex) + + if(_instance->traceLevels()->threadPool >= 1) { - if(handler->datagram()) - { - if(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0) - { - Warning out(_instance->initializationData().logger); - out << "datagram connection exception:\n" << ex << '\n' << handler->toString(); - } - handler->_stream.resize(0); - handler->_stream.i = stream.b.begin(); - } - else - { - handler->exception(ex); - } - continue; + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1); } - - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); + _threads.erase(thread); + _workQueue->queue(new JoinThreadWorkItem(thread)); + return; } - - // - // Provide a new mesage to the handler. - // - try + else if(_inUse > 0) { // - // "self" is faster than "this", as the reference count is not modified. - // - handler->message(stream, self); + // If this is the last idle thread but there are still other threads + // busy dispatching, we go back waiting with _threadIdleTime. We only + // wait with _serverIdleTime when there's only one thread left. + // + continue; } - catch(const LocalException& ex) + assert(_threads.size() == 1); + } + + try + { + current._handler = _selector.getNextHandler(current.operation, _serverIdleTime); + } + catch(const SelectorTimeoutException&) + { + Lock sync(*this); + if(!_destroyed) { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling message():\n" - << ex << '\n' << handler->toString(); -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif + _workQueue->queue(new ShutdownWorkItem(_instance)); } - - // - // No "continue", because we want message() to be - // called in its own thread from this pool. Note that - // this means that message() must call - // promoteFollower(). - // + continue; } } - if(_sizeMax > 1) + assert(current._handler); + try { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + current._handler->message(current); - if(!_destroyed) + if(_sizeMax > 1 && current._ioCompleted) { - if(_serialize && handler && handler->_serializing) - { - if(handler->_registered) - { - // No interrupt if no thread is blocked on select (_promote == true) - _selector.add(handler.get(), NeedRead, _promote); - } - handler->_serializing = false; - } + Lock sync(*this); + assert(_inUse > 0); + --_inUse; + } + } + catch(ThreadPoolDestroyedException&) + { + return; + } + catch(const exception& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); + } + } +#endif +} - if(_size < _sizeMax) // Dynamic thread pool - { - // - // First we reap threads that have been destroyed before. - // - int sz = static_cast<int>(_threads.size()); - assert(_running <= sz); - if(_running < sz) - { - vector<IceUtil::ThreadPtr>::iterator start = - partition(_threads.begin(), _threads.end(), - IceUtil::constMemFun(&IceUtil::Thread::isAlive)); +bool +IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) +{ + current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. - for(vector<IceUtil::ThreadPtr>::iterator p = start; p != _threads.end(); ++p) - { - (*p)->getThreadControl().join(); - } + if(_sizeMax > 1) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); +#ifndef ICE_USE_IOCP + --_inUseIO; + + if(_serialize && !_destroyed) + { + _selector.disable(current._handler.get(), current.operation); + } - _threads.erase(start, _threads.end()); - } - - // - // Now we check if this thread can be destroyed, based - // on a load factor. - // + if(current._leader) + { + // + // If this thread is still the leader, it's time to promote a new leader. + // + promoteFollower(current); + } + else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0)) + { + notify(); + } +#endif - // - // The load factor jumps immediately to the number of - // threads that are currently in use, but decays - // exponentially if the number of threads in use is - // smaller than the load factor. This reflects that we - // create threads immediately when they are needed, - // but want the number of threads to slowly decline to - // the configured minimum. - // - double inUse = static_cast<double>(_inUse); - if(_load < inUse) + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + Warning out(_instance->initializationData().logger); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + if(!_destroyed) + { + assert(_inUse <= static_cast<int>(_threads.size())); + if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) + { + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "growing " << _prefix << ": Size=" << _threads.size() + 1; + } + + try + { + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + if(_hasPriority) { - _load = inUse; + thread->start(_stackSize, _priority); } else { - const double loadFactor = 0.05; // TODO: Configurable? - const double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + inUse * loadFactor; - } - - if(_running > _size) - { - int load = static_cast<int>(_load + 0.5); - - // - // We add one to the load factor because on - // additional thread is needed for select(). - // - if(load + 1 < _running) - { - if(_instance->traceLevels()->threadPool >= 1) - { - Trace out(_instance->initializationData().logger, - _instance->traceLevels()->threadPoolCat); - out << "shrinking " << _prefix << ": Size = " << (_running - 1); - } - - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - - return false; - } + thread->start(_stackSize); } + _threads.insert(thread); + } + catch(const IceUtil::Exception& ex) + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for `" << _prefix << "':\n" << ex; } - - assert(_inUse > 0); - --_inUse; - } - - // - // Do not wait to be promoted again to release these objects. - // - handler = 0; - workItem = 0; - - while(!_promote) - { - wait(); } - - _promote = false; } } + + return _serialize; } +#ifdef ICE_USE_IOCP bool -IceInternal::ThreadPool::read(const EventHandlerPtr& handler) +IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) { - BasicStream& stream = handler->_stream; + assert(current._handler->_pending & current.operation); - if(stream.i - stream.b.begin() >= headerSize) + if(current._handler->_started & current.operation) { - if(!handler->read(stream)) + assert(!(current._handler->_ready & current.operation)); + current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation); + current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation); + if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished. { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; + } + } + else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation)) + { + assert(!(current._handler->_started & current.operation)); + if(!current._handler->startAsync(current.operation)) + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; + } + else + { + current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation); return false; } - assert(stream.i == stream.b.end()); - return true; } - if(stream.b.size() == 0) + if(current._handler->_registered & current.operation) { - stream.b.resize(headerSize); - stream.i = stream.b.begin(); + assert(current._handler->_ready & current.operation); + current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation); + return true; + } + else + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + if(!current._handler->_pending && current._handler->_finish) + { + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } + return false; } +} - if(stream.i != stream.b.end()) +void +IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) +{ + if(current._handler->_registered & current.operation) { - if(!handler->read(stream)) + assert(!(current._handler->_ready & current.operation)); + if(!current._handler->startAsync(current.operation)) { - return false; + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + } + else + { + assert(current._handler->_pending & current.operation); + current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation); } - assert(stream.i == stream.b.end()); } - - ptrdiff_t pos = stream.i - stream.b.begin(); - if(pos < headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - - stream.i = stream.b.begin(); - const Byte* m; - stream.readBlob(m, static_cast<Int>(sizeof(magic))); - if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); - throw ex; - } - Byte pMajor; - Byte pMinor; - stream.read(pMajor); - stream.read(pMinor); - if(pMajor != protocolMajor - || static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor)) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - stream.read(eMajor); - stream.read(eMinor); - if(eMajor != encodingMajor - || static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor)) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - Byte messageType; - stream.read(messageType); - Byte compress; - stream.read(compress); - Int size; - stream.read(size); - if(size < headerSize) - { - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(_instance->messageSizeMax())) - { - Ex::throwMemoryLimitException(__FILE__, __LINE__, size, _instance->messageSizeMax()); - } - if(size > static_cast<Int>(stream.b.size())) - { - stream.b.resize(size); - } - stream.i = stream.b.begin() + pos; - - if(stream.i != stream.b.end()) + else + { + current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); + } + + if(!current._handler->_pending && current._handler->_finish) + { + // There are no more pending async operations, it's time to call finish. + _workQueue->queue(new FinishedWorkItem(current._handler)); + _selector.finish(current._handler.get()); + } +} +#else +void +IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current) +{ + assert(!_promote && current._leader); + _promote = true; + if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0)) + { + notify(); + } + current._leader = false; +} + +bool +IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current) +{ + assert(!current._leader); + + // + // It's important to clear the handler before waiting to make sure that + // resources for the handler are released now if it's finished. We also + // clear the per-thread stream. + // + current._handler = 0; + current.stream.clear(); + current.stream.b.clear(); + + // + // Wait to be promoted and for all the IO threads to be done. + // + while(!_promote || _inUseIO == _sizeIO || _nextHandler == _handlers.end() && _inUseIO > 0) { - if(handler->datagram()) + if(_threadIdleTime) { - if(_warnUdp) + if(!timedWait(IceUtil::Time::seconds(_threadIdleTime))) { - Warning out(_instance->initializationData().logger); - out << "DatagramLimitException: maximum size of " << pos << " exceeded"; + if(!_destroyed && (!_promote || _inUseIO == _sizeIO || _nextHandler == _handlers.end() && _inUseIO > 0)) + { + if(_instance->traceLevels()->threadPool >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); + out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1); + } + assert(_threads.size() > 1); // Can only be called by a waiting follower thread. + _threads.erase(thread); + _workQueue->queue(new JoinThreadWorkItem(thread)); + return true; + } } - throw DatagramLimitException(__FILE__, __LINE__); } else { - if(!handler->read(stream)) - { - return false; - } - assert(stream.i == stream.b.end()); + wait(); } } - - return true; + current._leader = true; // The current thread has become the leader. + _promote = false; + return false; } +#endif IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : IceUtil::Thread(pool->_prefix + " thread"), @@ -841,55 +1037,65 @@ IceInternal::ThreadPool::EventHandlerThread::run() { if(_pool->_instance->initializationData().threadHook) { - _pool->_instance->initializationData().threadHook->start(); + try + { + _pool->_instance->initializationData().threadHook->start(); + } + catch(const exception& ex) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; + } + catch(...) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'"; + } } - bool promote; - try { - promote = _pool->run(); + _pool->run(this); } - catch(const Ice::Exception& ex) + catch(const exception& ex) { Error out(_pool->_instance->initializationData().logger); - out << "exception in `" << _pool->_prefix << "':\n" << ex.what(); -#ifdef __GNUC__ - out << "\n" << ex.ice_stackTrace(); -#endif - promote = true; - } - catch(const std::exception& ex) - { - Error out(_pool->_instance->initializationData().logger); - out << "exception in `" << _pool->_prefix << "':\n" << ex.what(); - promote = true; + out << "exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->initializationData().logger); out << "unknown exception in `" << _pool->_prefix << "'"; - promote = true; } - if(promote && _pool->_sizeMax > 1) + if(_pool->_instance->initializationData().threadHook) { - // - // Promote a follower, but w/o modifying _inUse or creating - // new threads. - // + try { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get()); - assert(!_pool->_promote); - _pool->_promote = true; - _pool->notify(); + _pool->_instance->initializationData().threadHook->stop(); + } + catch(const exception& ex) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; + } + catch(...) + { + Error out(_pool->_instance->initializationData().logger); + out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'"; } - } - - if(_pool->_instance->initializationData().threadHook) - { - _pool->_instance->initializationData().threadHook->stop(); } _pool = 0; // Break cyclic dependency. } + +ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) : + operation(SocketOperationNone), + stream(instance.get()), + _threadPool(threadPool.get()), + _ioCompleted(false) +#ifndef ICE_USE_IOCP + , _leader(false) +#endif +{ +} |