diff options
author | Marc Laukien <marc@zeroc.com> | 2004-01-13 16:07:56 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-01-13 16:07:56 +0000 |
commit | 7db65aa3d43b934793aa7cc03c5adbe1f1876a20 (patch) | |
tree | fac7ee47f9b4d85a70b99df9c5f00d879a366206 /cpp/src/Ice/ThreadPool.cpp | |
parent | minor cleanup (diff) | |
download | ice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.tar.bz2 ice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.tar.xz ice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.zip |
promotion fix
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 211 |
1 files changed, 132 insertions, 79 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 98812026aa7..3bce62e5bc7 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -43,6 +43,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _running(0), _inUse(0), _load(0), + _promote(true), _warnUdp(_instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0) { SOCKET fds[2]; @@ -116,7 +117,7 @@ IceInternal::ThreadPool::~ThreadPool() void IceInternal::ThreadPool::destroy() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); assert(_handlerMap.empty()); assert(_changes.empty()); @@ -127,7 +128,7 @@ IceInternal::ThreadPool::destroy() void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, handler)); setInterrupt(0); @@ -136,7 +137,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) void IceInternal::ThreadPool::unregister(SOCKET fd) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, EventHandlerPtr(0))); setInterrupt(0); @@ -147,37 +148,37 @@ IceInternal::ThreadPool::promoteFollower() { if(_sizeMax > 1) { - _promoteMutex.unlock(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - { - IceUtil::Mutex::Lock sync(*this); + assert(!_promote); + _promote = true; + notify(); - if(!_destroyed) + if(!_destroyed) + { + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) { - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) + Warning out(_instance->logger()); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + assert(_inUse <= _running); + if(_inUse < _sizeMax && _inUse == _running) + { + try { - Warning out(_instance->logger()); - out << "thread pool `" << _prefix << "' is running low on threads\n" - << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + _threads.push_back(thread->start()); + ++_running; } - - assert(_inUse <= _running); - if(_inUse < _sizeMax && _inUse == _running) + catch(const IceUtil::Exception& ex) { - try - { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); - _threads.push_back(thread->start()); - ++_running; - } - catch(const IceUtil::Exception& ex) - { - Error out(_instance->logger()); - out << "cannot create thread for `" << _prefix << "':\n" << ex; - } + Error out(_instance->logger()); + out << "cannot create thread for `" << _prefix << "':\n" << ex; } } } @@ -297,7 +298,14 @@ IceInternal::ThreadPool::run() if(_sizeMax > 1) { - _promoteMutex.lock(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(!_promote) + { + wait(); + } + + _promote = false; } while(true) @@ -344,7 +352,7 @@ IceInternal::ThreadPool::run() bool shutdown = false; { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(FD_ISSET(_fdIntrRead, &fdSet)) { @@ -514,6 +522,12 @@ IceInternal::ThreadPool::run() promoteFollower(); factory->shutdown(); + + // + // No "continue", because we want shutdown to be done in + // its own thread from this pool. Therefore we called + // promoteFollower(). + // } else { @@ -527,7 +541,11 @@ IceInternal::ThreadPool::run() // try { - handler->finished(self); // "self" is faster than "this", as the reference count is not modified. + // + // "self" is faster than "this", as the reference + // count is not modified. + // + handler->finished(self); } catch(const LocalException& ex) { @@ -535,6 +553,13 @@ IceInternal::ThreadPool::run() out << "exception in `" << _prefix << "' while calling finished():\n" << ex << '\n' << handler->toString(); } + + // + // No "continue", because we want finished() to be + // called in its own thread from this pool. Note that + // this means that finished() must call + // promoteFollower(). + // } else { @@ -568,69 +593,92 @@ IceInternal::ThreadPool::run() } // - // "self" is faster than "this", as the reference - // count is not modified. + // Provide a new mesage to the handler. + // + try + { + // + // "self" is faster than "this", as the reference + // count is not modified. + // + handler->message(stream, self); + } + catch(const LocalException& ex) + { + Error out(_instance->logger()); + out << "exception in `" << _prefix << "' while calling finished():\n" + << ex << '\n' << handler->toString(); + } + + // + // No "continue", because we want message() to be + // called in its own thread from this pool. Note that + // this means that message() must call + // promoteFollower(). // - handler->message(stream, self); } } if(_sizeMax > 1) { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(!_destroyed) { - IceUtil::Mutex::Lock sync(*this); - - if(!_destroyed) + // + // First we reap threads that have been destroyed before. + // + int sz = static_cast<int>(_threads.size()); + assert(_running <= sz); + if(_running < sz) { - // - // First we reap threads that have been destroyed before. - // - int sz = static_cast<int>(_threads.size()); - assert(_running <= sz); - if(_running < sz) - { - vector<IceUtil::ThreadControl>::iterator start = - partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive)); + vector<IceUtil::ThreadControl>::iterator start = + partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive)); #if defined(_MSC_VER) && _MSC_VER <= 1200 // The mem_fun_ref below does not work with VC++ 6.0 - for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p) - { - p->join(); - } + for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p) + { + p->join(); + } #else - for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); + for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); #endif - _threads.erase(start, _threads.end()); - } - - // - // Now we check if this thread can be destroyed, based - // on a load factor. - // - const double loadFactor = 0.05; // TODO: Configurable? - const double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + _inUse * loadFactor; - - if(_running > _size) + _threads.erase(start, _threads.end()); + } + + // + // Now we check if this thread can be destroyed, based + // on a load factor. + // + const double loadFactor = 0.05; // TODO: Configurable? + const double oneMinusLoadFactor = 1 - loadFactor; + _load = _load * oneMinusLoadFactor + _inUse * loadFactor; + + if(_running > _size) + { + int load = static_cast<int>(_load + 1); + if(load < _running) { - int load = static_cast<int>(_load + 1); - if(load < _running) - { - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - - return false; - } - } + assert(_inUse > 0); + --_inUse; + + assert(_running > 0); + --_running; - assert(_inUse > 0); - --_inUse; + return false; + } } + + assert(_inUse > 0); + --_inUse; } - _promoteMutex.lock(); + + while(!_promote) + { + wait(); + } + + _promote = false; } } } @@ -771,7 +819,12 @@ IceInternal::ThreadPool::EventHandlerThread::run() // Promote a follower, but w/o modifying _inUse or creating // new threads. // - _pool->_promoteMutex.unlock(); + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get()); + assert(!_pool->_promote); + _pool->_promote = true; + _pool->notify(); + } } _pool = 0; // Break cyclic dependency. |