diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 23 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 119 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 20 |
3 files changed, 119 insertions, 43 deletions
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 3b32c71d510..06c1148d161 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -207,6 +207,22 @@ IceInternal::Instance::clientThreadPool() if(!_clientThreadPool) // Lazy initialization. { + // + // Make sure that the client thread pool defaults are correctly + // + if(_properties->getProperty("Ice.ThreadPool.Client.Size").empty()) + { + _properties->setProperty("Ice.ThreadPool.Client.Size", "1"); + } + if(_properties->getProperty("Ice.ThreadPool.Client.SizeMax").empty()) + { + _properties->setProperty("Ice.ThreadPool.Client.SizeMax", "1"); + } + if(_properties->getProperty("Ice.ThreadPool.Client.SizeWarn").empty()) + { + _properties->setProperty("Ice.ThreadPool.Client.SizeWarn", "0"); + } + _clientThreadPool = new ThreadPool(this, "Ice.ThreadPool.Client", 0); } @@ -222,13 +238,6 @@ IceInternal::Instance::serverThreadPool() if(!_serverThreadPool) // Lazy initialization. { - // - // Make sure that the server thread pool default is set - // correctly. - // - _properties->setProperty("Ice.ThreadPool.Server.Size", - _properties->getPropertyWithDefault("Ice.ThreadPool.Server.Size", "10")); - int timeout = _properties->getPropertyAsInt("Ice.ServerIdleTime"); _serverThreadPool = new ThreadPool(this, "Ice.ThreadPool.Server", timeout); } diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 78739923f8b..deeae08c287 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,6 +33,8 @@ void IceInternal::decRef(ThreadPool* p) { p->__decRef(); } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : _instance(instance), _destroyed(false), + _prefix(prefix), + _inUse(0), _lastFd(INVALID_SOCKET), _timeout(timeout) { @@ -47,24 +49,29 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _maxFd = _fdIntrRead; _minFd = _fdIntrRead; - int size = _instance->properties()->getPropertyAsInt(prefix + ".Size"); + int size = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".Size", 5); if(size < 1) { size = 1; - _instance->properties()->setProperty(prefix + ".Size", "1"); + ostringstream str; + str << size; + _instance->properties()->setProperty(_prefix + ".Size", str.str()); } const_cast<int&>(_size) = size; - - int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(prefix + ".SizeMax", _size * 5); + + int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 10); if(sizeMax < _size) { sizeMax = _size; ostringstream str; str << sizeMax; - _instance->properties()->setProperty(prefix + ".SizeMax", str.str()); + _instance->properties()->setProperty(_prefix + ".SizeMax", str.str()); } const_cast<int&>(_sizeMax) = sizeMax; + int sizeWarn = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeWarn", _sizeMax * 80 / 100); + const_cast<int&>(_sizeWarn) = sizeWarn; + __setNoDelete(true); try { @@ -78,7 +85,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { { Error out(_instance->logger()); - out << "cannot create threads for thread pool:\n" << ex; + out << "cannot create thread for `" << _prefix << "':\n" << ex; } destroy(); @@ -97,6 +104,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p IceInternal::ThreadPool::~ThreadPool() { assert(_destroyed); + assert(_inUse == 0); closeSocket(_fdIntrWrite); closeSocket(_fdIntrRead); } @@ -133,9 +141,40 @@ IceInternal::ThreadPool::unregister(SOCKET fd) void IceInternal::ThreadPool::promoteFollower() { - if(_size > 1) + if(_sizeMax > 1) { _threadMutex.unlock(); + + { + IceUtil::Mutex::Lock sync(_inUseMutex); + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + Warning out(_instance->logger()); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + assert(_inUse <= static_cast<int>(_threads.size())); + if(!_destroyed && _inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) + { + try + { + cout << __FILE__ << ": " << __LINE__ << "\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn + << " _threads.size()=" << _threads.size() << endl; + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + _threads.push_back(thread->start()); + } + catch(const IceUtil::Exception& ex) + { + Error out(_instance->logger()); + out << "cannot create thread for `" << _prefix << "':\n" << ex; + } + } + } } } @@ -238,15 +277,13 @@ IceInternal::ThreadPool::run() { ThreadPoolPtr self = this; - while(true) + if(_sizeMax > 1) { - if(_size > 1) - { - _threadMutex.lock(); - } + _threadMutex.lock(); + } - repeatSelect: - + while(true) + { fd_set fdSet; memcpy(&fdSet, &_fdSet, sizeof(fd_set)); int ret; @@ -267,14 +304,14 @@ IceInternal::ThreadPool::run() assert(_timeout > 0); _timeout = 0; initiateShutdown(); - goto repeatSelect; + continue; } if(ret == SOCKET_ERROR) { if(interrupted()) { - goto repeatSelect; + continue; } SocketException ex(__FILE__, __LINE__); @@ -331,7 +368,7 @@ IceInternal::ThreadPool::run() FD_SET(change.first, &_fdSet); _maxFd = max(_maxFd, change.first); _minFd = min(_minFd, change.first); - goto repeatSelect; + continue; } else // Removal if handler is not set. { @@ -348,7 +385,7 @@ IceInternal::ThreadPool::run() _maxFd = max(_maxFd, (--_handlerMap.end())->first); _minFd = min(_minFd, _handlerMap.begin()->first); } - // Don't goto repeatSelect; we have to call + // Don't continue; we have to call // finished() on the event handler below, outside // the thread synchronization. } @@ -367,8 +404,8 @@ IceInternal::ThreadPool::run() if(fdSet.fd_count == 0) { Error out(_instance->logger()); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; + out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable"; + continue; } SOCKET largerFd = _maxFd + 1; @@ -419,8 +456,8 @@ IceInternal::ThreadPool::run() if(loops > 1) { Error out(_instance->logger()); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; + out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable"; + continue; } #endif @@ -430,8 +467,8 @@ IceInternal::ThreadPool::run() if(p == _handlerMap.end()) { Error out(_instance->logger()); - out << "filedescriptor " << _lastFd << " not registered with the thread pool"; - goto repeatSelect; + out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'"; + continue; } handler = p->second; @@ -448,7 +485,7 @@ IceInternal::ThreadPool::run() ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory(); if(!factory) { - goto repeatSelect; + continue; } promoteFollower(); @@ -471,7 +508,8 @@ IceInternal::ThreadPool::run() catch(const LocalException& ex) { Error out(_instance->logger()); - out << "exception while calling finished():\n" << ex << '\n' << handler->toString(); + out << "exception in `" << _prefix << "' while calling finished():\n" + << ex << '\n' << handler->toString(); } } else @@ -489,12 +527,12 @@ IceInternal::ThreadPool::run() } catch(const TimeoutException&) // Expected. { - goto repeatSelect; + continue; } catch(const LocalException& ex) { handler->exception(ex); - goto repeatSelect; + continue; } stream.swap(handler->_stream); @@ -508,6 +546,17 @@ IceInternal::ThreadPool::run() handler->message(stream, self); } } + + if(_sizeMax > 1) + { + { + IceUtil::Mutex::Lock sync(_inUseMutex); + assert(_inUse > 0); + --_inUse; + } + + _threadMutex.lock(); + } } } @@ -607,14 +656,22 @@ IceInternal::ThreadPool::EventHandlerThread::run() catch(const Exception& ex) { Error out(_pool->_instance->logger()); - out << "exception in thread pool:\n" << ex; + out << "exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->logger()); - out << "unknown exception in thread pool"; + out << "unknown exception in `" << _pool->_prefix << "'"; + } + + // + // Promote a follower, but w/o modifying _inUse or creating new + // threads. + // + if(_pool->_sizeMax > 1) + { + _pool->_threadMutex.unlock(); } - _pool->promoteFollower(); _pool = 0; // Break cyclic dependency. } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 626c56cb1e1..26df7f14ee5 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -35,7 +35,7 @@ namespace IceInternal class BasicStream; -class ThreadPool : public ::IceUtil::Shared, public IceUtil::Mutex +class ThreadPool : public IceUtil::Shared, public IceUtil::Mutex { public: @@ -60,20 +60,30 @@ private: InstancePtr _instance; bool _destroyed; - const int _size; - const int _sizeMax; + const std::string _prefix; + + const int _size; // Number of threads that are pre-created. + const int _sizeMax; // Maximum number of threads. + const int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. + int _inUse; // Number of threads that are currently in use. + IceUtil::Mutex _inUseMutex; + SOCKET _maxFd; SOCKET _minFd; SOCKET _lastFd; SOCKET _fdIntrRead; SOCKET _fdIntrWrite; fd_set _fdSet; + std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal. + std::map<SOCKET, EventHandlerPtr> _handlerMap; + int _timeout; - ::IceUtil::Mutex _threadMutex; - class EventHandlerThread : public ::IceUtil::Thread + IceUtil::Mutex _threadMutex; + + class EventHandlerThread : public IceUtil::Thread { public: |