diff options
author | Marc Laukien <marc@zeroc.com> | 2003-03-07 19:41:12 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-03-07 19:41:12 +0000 |
commit | 909902165af20518131b43d2fb05e5c7234f016f (patch) | |
tree | efad7b12a59e08d0c10ac9d1d3900c641e6c5056 /cpp/src/Ice/ThreadPool.cpp | |
parent | internal thread pool changes (diff) | |
download | ice-909902165af20518131b43d2fb05e5c7234f016f.tar.bz2 ice-909902165af20518131b43d2fb05e5c7234f016f.tar.xz ice-909902165af20518131b43d2fb05e5c7234f016f.zip |
dyn thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 119 |
1 files changed, 88 insertions, 31 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 78739923f8b..deeae08c287 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,6 +33,8 @@ void IceInternal::decRef(ThreadPool* p) { p->__decRef(); } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : _instance(instance), _destroyed(false), + _prefix(prefix), + _inUse(0), _lastFd(INVALID_SOCKET), _timeout(timeout) { @@ -47,24 +49,29 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _maxFd = _fdIntrRead; _minFd = _fdIntrRead; - int size = _instance->properties()->getPropertyAsInt(prefix + ".Size"); + int size = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".Size", 5); if(size < 1) { size = 1; - _instance->properties()->setProperty(prefix + ".Size", "1"); + ostringstream str; + str << size; + _instance->properties()->setProperty(_prefix + ".Size", str.str()); } const_cast<int&>(_size) = size; - - int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(prefix + ".SizeMax", _size * 5); + + int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 10); if(sizeMax < _size) { sizeMax = _size; ostringstream str; str << sizeMax; - _instance->properties()->setProperty(prefix + ".SizeMax", str.str()); + _instance->properties()->setProperty(_prefix + ".SizeMax", str.str()); } const_cast<int&>(_sizeMax) = sizeMax; + int sizeWarn = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeWarn", _sizeMax * 80 / 100); + const_cast<int&>(_sizeWarn) = sizeWarn; + __setNoDelete(true); try { @@ -78,7 +85,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { { Error out(_instance->logger()); - out << "cannot create threads for thread pool:\n" << ex; + out << "cannot create thread for `" << _prefix << "':\n" << ex; } destroy(); @@ -97,6 +104,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p IceInternal::ThreadPool::~ThreadPool() { assert(_destroyed); + assert(_inUse == 0); closeSocket(_fdIntrWrite); closeSocket(_fdIntrRead); } @@ -133,9 +141,40 @@ IceInternal::ThreadPool::unregister(SOCKET fd) void IceInternal::ThreadPool::promoteFollower() { - if(_size > 1) + if(_sizeMax > 1) { _threadMutex.unlock(); + + { + IceUtil::Mutex::Lock sync(_inUseMutex); + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + Warning out(_instance->logger()); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + assert(_inUse <= static_cast<int>(_threads.size())); + if(!_destroyed && _inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) + { + try + { + cout << __FILE__ << ": " << __LINE__ << "\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn + << " _threads.size()=" << _threads.size() << endl; + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + _threads.push_back(thread->start()); + } + catch(const IceUtil::Exception& ex) + { + Error out(_instance->logger()); + out << "cannot create thread for `" << _prefix << "':\n" << ex; + } + } + } } } @@ -238,15 +277,13 @@ IceInternal::ThreadPool::run() { ThreadPoolPtr self = this; - while(true) + if(_sizeMax > 1) { - if(_size > 1) - { - _threadMutex.lock(); - } + _threadMutex.lock(); + } - repeatSelect: - + while(true) + { fd_set fdSet; memcpy(&fdSet, &_fdSet, sizeof(fd_set)); int ret; @@ -267,14 +304,14 @@ IceInternal::ThreadPool::run() assert(_timeout > 0); _timeout = 0; initiateShutdown(); - goto repeatSelect; + continue; } if(ret == SOCKET_ERROR) { if(interrupted()) { - goto repeatSelect; + continue; } SocketException ex(__FILE__, __LINE__); @@ -331,7 +368,7 @@ IceInternal::ThreadPool::run() FD_SET(change.first, &_fdSet); _maxFd = max(_maxFd, change.first); _minFd = min(_minFd, change.first); - goto repeatSelect; + continue; } else // Removal if handler is not set. { @@ -348,7 +385,7 @@ IceInternal::ThreadPool::run() _maxFd = max(_maxFd, (--_handlerMap.end())->first); _minFd = min(_minFd, _handlerMap.begin()->first); } - // Don't goto repeatSelect; we have to call + // Don't continue; we have to call // finished() on the event handler below, outside // the thread synchronization. } @@ -367,8 +404,8 @@ IceInternal::ThreadPool::run() if(fdSet.fd_count == 0) { Error out(_instance->logger()); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; + out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable"; + continue; } SOCKET largerFd = _maxFd + 1; @@ -419,8 +456,8 @@ IceInternal::ThreadPool::run() if(loops > 1) { Error out(_instance->logger()); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; + out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable"; + continue; } #endif @@ -430,8 +467,8 @@ IceInternal::ThreadPool::run() if(p == _handlerMap.end()) { Error out(_instance->logger()); - out << "filedescriptor " << _lastFd << " not registered with the thread pool"; - goto repeatSelect; + out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'"; + continue; } handler = p->second; @@ -448,7 +485,7 @@ IceInternal::ThreadPool::run() ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory(); if(!factory) { - goto repeatSelect; + continue; } promoteFollower(); @@ -471,7 +508,8 @@ IceInternal::ThreadPool::run() catch(const LocalException& ex) { Error out(_instance->logger()); - out << "exception while calling finished():\n" << ex << '\n' << handler->toString(); + out << "exception in `" << _prefix << "' while calling finished():\n" + << ex << '\n' << handler->toString(); } } else @@ -489,12 +527,12 @@ IceInternal::ThreadPool::run() } catch(const TimeoutException&) // Expected. { - goto repeatSelect; + continue; } catch(const LocalException& ex) { handler->exception(ex); - goto repeatSelect; + continue; } stream.swap(handler->_stream); @@ -508,6 +546,17 @@ IceInternal::ThreadPool::run() handler->message(stream, self); } } + + if(_sizeMax > 1) + { + { + IceUtil::Mutex::Lock sync(_inUseMutex); + assert(_inUse > 0); + --_inUse; + } + + _threadMutex.lock(); + } } } @@ -607,14 +656,22 @@ IceInternal::ThreadPool::EventHandlerThread::run() catch(const Exception& ex) { Error out(_pool->_instance->logger()); - out << "exception in thread pool:\n" << ex; + out << "exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->logger()); - out << "unknown exception in thread pool"; + out << "unknown exception in `" << _pool->_prefix << "'"; + } + + // + // Promote a follower, but w/o modifying _inUse or creating new + // threads. + // + if(_pool->_sizeMax > 1) + { + _pool->_threadMutex.unlock(); } - _pool->promoteFollower(); _pool = 0; // Break cyclic dependency. } |