diff options
author | Marc Laukien <marc@zeroc.com> | 2003-03-24 16:27:02 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-03-24 16:27:02 +0000 |
commit | 8effd206a20226446062a0ce3f5bf1c67350ba13 (patch) | |
tree | 796d5569f19c29186445d8edbb58c905237bf72b /cpp/src/Ice/ThreadPool.cpp | |
parent | make depend (diff) | |
download | ice-8effd206a20226446062a0ce3f5bf1c67350ba13.tar.bz2 ice-8effd206a20226446062a0ce3f5bf1c67350ba13.tar.xz ice-8effd206a20226446062a0ce3f5bf1c67350ba13.zip |
dyn thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 115 |
1 files changed, 82 insertions, 33 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 592386142a6..be1ffb7372d 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -39,6 +39,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _size(0), _sizeMax(0), _sizeWarn(0), + _running(0), _inUse(0), _load(0) { @@ -77,6 +78,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { IceUtil::ThreadPtr thread = new EventHandlerThread(this); _threads.push_back(thread->start()); + ++_running; } } catch(const IceUtil::Exception& ex) @@ -102,7 +104,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p IceInternal::ThreadPool::~ThreadPool() { assert(_destroyed); - assert(_inUse == 0); closeSocket(_fdIntrWrite); closeSocket(_fdIntrRead); @@ -147,30 +148,32 @@ IceInternal::ThreadPool::promoteFollower() { IceUtil::Mutex::Lock sync(*this); - assert(_inUse >= 0); - ++_inUse; -// _load = std::max(_load * 0.95 + _inUse * 0.05, static_cast<double>(_inUse)); // TODO: Configurable? -// cout << "_load = " << _load << endl; - - if(_inUse == _sizeWarn) + if(!_destroyed) { - Warning out(_instance->logger()); - out << "thread pool `" << _prefix << "' is running low on threads\n" - << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; - } - - assert(_inUse <= static_cast<int>(_threads.size())); - if(!_destroyed && _inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) - { - try + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); - _threads.push_back(thread->start()); + Warning out(_instance->logger()); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; } - catch(const IceUtil::Exception& ex) + + assert(_inUse <= _running); + if(_inUse < _sizeMax && _inUse == _running) { - Error out(_instance->logger()); - out << "cannot create thread for `" << _prefix << "':\n" << ex; + try + { + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + _threads.push_back(thread->start()); + ++_running; + } + catch(const IceUtil::Exception& ex) + { + Error out(_instance->logger()); + out << "cannot create thread for `" << _prefix << "':\n" << ex; + } } } } @@ -197,10 +200,13 @@ IceInternal::ThreadPool::joinWithAllThreads() // threads would never terminate.) // assert(_destroyed); + for_each(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); +/* for(vector<IceUtil::ThreadControl>::iterator p = _threads.begin(); p != _threads.end(); ++p) { p->join(); } +*/ } bool @@ -271,7 +277,7 @@ repeat: #endif } -void +bool IceInternal::ThreadPool::run() { ThreadPoolPtr self = this; @@ -346,7 +352,7 @@ IceInternal::ThreadPool::run() // Don't clear the interrupt if destroyed, so that // the other threads exit as well. // - return; + return true; } shutdown = clearInterrupt(); @@ -550,9 +556,48 @@ IceInternal::ThreadPool::run() { { IceUtil::Mutex::Lock sync(*this); + + if(!_destroyed) + { + // + // First we reap threads that have been destroyed before. + // + int sz = static_cast<int>(_threads.size()); + assert(_running <= sz); + if(_running < sz) + { + vector<IceUtil::ThreadControl>::iterator p = + partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive)); + for_each(p, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); + _threads.erase(p, _threads.end()); + } + + // + // Now we check if this thread can be destroyed, based + // on a load factor. + // + const double loadFactor = 0.05; // TODO: Configurable? + const double oneMinusLoadFactor = 1 - loadFactor; + _load = _load * oneMinusLoadFactor + _inUse * loadFactor; + + if(_running > _size) + { + int load = static_cast<int>(_load + 1); + if(load < _running) + { + assert(_inUse > 0); + --_inUse; - assert(_inUse > 0); - --_inUse; + assert(_running > 0); + --_running; + + return false; + } + } + + assert(_inUse > 0); + --_inUse; + } } _promoteMutex.lock(); @@ -651,27 +696,31 @@ IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPool void IceInternal::ThreadPool::EventHandlerThread::run() { + bool promote; + try { - _pool->run(); + promote = _pool->run(); } catch(const Exception& ex) { Error out(_pool->_instance->logger()); out << "exception in `" << _pool->_prefix << "':\n" << ex; + promote = true; } catch(...) { Error out(_pool->_instance->logger()); - out << "unknown exception in `" << _pool->_prefix << "'"; - } + out << "unknown exception in `" << _pool->_prefix << "'"; + promote = true; + } - // - // Promote a follower, but w/o modifying _inUse or creating new - // threads. - // - if(_pool->_sizeMax > 1) + if(promote && _pool->_sizeMax > 1) { + // + // Promote a follower, but w/o modifying _inUse or creating + // new threads. + // _pool->_promoteMutex.unlock(); } |