summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2003-03-07 19:41:12 +0000
committerMarc Laukien <marc@zeroc.com>2003-03-07 19:41:12 +0000
commit909902165af20518131b43d2fb05e5c7234f016f (patch)
treeefad7b12a59e08d0c10ac9d1d3900c641e6c5056 /cpp/src/Ice/ThreadPool.cpp
parentinternal thread pool changes (diff)
downloadice-909902165af20518131b43d2fb05e5c7234f016f.tar.bz2
ice-909902165af20518131b43d2fb05e5c7234f016f.tar.xz
ice-909902165af20518131b43d2fb05e5c7234f016f.zip
dyn thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp119
1 files changed, 88 insertions, 31 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 78739923f8b..deeae08c287 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -33,6 +33,8 @@ void IceInternal::decRef(ThreadPool* p) { p->__decRef(); }
IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
_instance(instance),
_destroyed(false),
+ _prefix(prefix),
+ _inUse(0),
_lastFd(INVALID_SOCKET),
_timeout(timeout)
{
@@ -47,24 +49,29 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_maxFd = _fdIntrRead;
_minFd = _fdIntrRead;
- int size = _instance->properties()->getPropertyAsInt(prefix + ".Size");
+ int size = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".Size", 5);
if(size < 1)
{
size = 1;
- _instance->properties()->setProperty(prefix + ".Size", "1");
+ ostringstream str;
+ str << size;
+ _instance->properties()->setProperty(_prefix + ".Size", str.str());
}
const_cast<int&>(_size) = size;
-
- int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(prefix + ".SizeMax", _size * 5);
+
+ int sizeMax = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeMax", _size * 10);
if(sizeMax < _size)
{
sizeMax = _size;
ostringstream str;
str << sizeMax;
- _instance->properties()->setProperty(prefix + ".SizeMax", str.str());
+ _instance->properties()->setProperty(_prefix + ".SizeMax", str.str());
}
const_cast<int&>(_sizeMax) = sizeMax;
+ int sizeWarn = _instance->properties()->getPropertyAsIntWithDefault(_prefix + ".SizeWarn", _sizeMax * 80 / 100);
+ const_cast<int&>(_sizeWarn) = sizeWarn;
+
__setNoDelete(true);
try
{
@@ -78,7 +85,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
{
{
Error out(_instance->logger());
- out << "cannot create threads for thread pool:\n" << ex;
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
}
destroy();
@@ -97,6 +104,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
IceInternal::ThreadPool::~ThreadPool()
{
assert(_destroyed);
+ assert(_inUse == 0);
closeSocket(_fdIntrWrite);
closeSocket(_fdIntrRead);
}
@@ -133,9 +141,40 @@ IceInternal::ThreadPool::unregister(SOCKET fd)
void
IceInternal::ThreadPool::promoteFollower()
{
- if(_size > 1)
+ if(_sizeMax > 1)
{
_threadMutex.unlock();
+
+ {
+ IceUtil::Mutex::Lock sync(_inUseMutex);
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ Warning out(_instance->logger());
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ assert(_inUse <= static_cast<int>(_threads.size()));
+ if(!_destroyed && _inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
+ {
+ try
+ {
+ cout << __FILE__ << ": " << __LINE__ << "\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn
+ << " _threads.size()=" << _threads.size() << endl;
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ _threads.push_back(thread->start());
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Error out(_instance->logger());
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
+ }
+ }
}
}
@@ -238,15 +277,13 @@ IceInternal::ThreadPool::run()
{
ThreadPoolPtr self = this;
- while(true)
+ if(_sizeMax > 1)
{
- if(_size > 1)
- {
- _threadMutex.lock();
- }
+ _threadMutex.lock();
+ }
- repeatSelect:
-
+ while(true)
+ {
fd_set fdSet;
memcpy(&fdSet, &_fdSet, sizeof(fd_set));
int ret;
@@ -267,14 +304,14 @@ IceInternal::ThreadPool::run()
assert(_timeout > 0);
_timeout = 0;
initiateShutdown();
- goto repeatSelect;
+ continue;
}
if(ret == SOCKET_ERROR)
{
if(interrupted())
{
- goto repeatSelect;
+ continue;
}
SocketException ex(__FILE__, __LINE__);
@@ -331,7 +368,7 @@ IceInternal::ThreadPool::run()
FD_SET(change.first, &_fdSet);
_maxFd = max(_maxFd, change.first);
_minFd = min(_minFd, change.first);
- goto repeatSelect;
+ continue;
}
else // Removal if handler is not set.
{
@@ -348,7 +385,7 @@ IceInternal::ThreadPool::run()
_maxFd = max(_maxFd, (--_handlerMap.end())->first);
_minFd = min(_minFd, _handlerMap.begin()->first);
}
- // Don't goto repeatSelect; we have to call
+ // Don't continue; we have to call
// finished() on the event handler below, outside
// the thread synchronization.
}
@@ -367,8 +404,8 @@ IceInternal::ThreadPool::run()
if(fdSet.fd_count == 0)
{
Error out(_instance->logger());
- out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- goto repeatSelect;
+ out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable";
+ continue;
}
SOCKET largerFd = _maxFd + 1;
@@ -419,8 +456,8 @@ IceInternal::ThreadPool::run()
if(loops > 1)
{
Error out(_instance->logger());
- out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- goto repeatSelect;
+ out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable";
+ continue;
}
#endif
@@ -430,8 +467,8 @@ IceInternal::ThreadPool::run()
if(p == _handlerMap.end())
{
Error out(_instance->logger());
- out << "filedescriptor " << _lastFd << " not registered with the thread pool";
- goto repeatSelect;
+ out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
+ continue;
}
handler = p->second;
@@ -448,7 +485,7 @@ IceInternal::ThreadPool::run()
ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory();
if(!factory)
{
- goto repeatSelect;
+ continue;
}
promoteFollower();
@@ -471,7 +508,8 @@ IceInternal::ThreadPool::run()
catch(const LocalException& ex)
{
Error out(_instance->logger());
- out << "exception while calling finished():\n" << ex << '\n' << handler->toString();
+ out << "exception in `" << _prefix << "' while calling finished():\n"
+ << ex << '\n' << handler->toString();
}
}
else
@@ -489,12 +527,12 @@ IceInternal::ThreadPool::run()
}
catch(const TimeoutException&) // Expected.
{
- goto repeatSelect;
+ continue;
}
catch(const LocalException& ex)
{
handler->exception(ex);
- goto repeatSelect;
+ continue;
}
stream.swap(handler->_stream);
@@ -508,6 +546,17 @@ IceInternal::ThreadPool::run()
handler->message(stream, self);
}
}
+
+ if(_sizeMax > 1)
+ {
+ {
+ IceUtil::Mutex::Lock sync(_inUseMutex);
+ assert(_inUse > 0);
+ --_inUse;
+ }
+
+ _threadMutex.lock();
+ }
}
}
@@ -607,14 +656,22 @@ IceInternal::ThreadPool::EventHandlerThread::run()
catch(const Exception& ex)
{
Error out(_pool->_instance->logger());
- out << "exception in thread pool:\n" << ex;
+ out << "exception in `" << _pool->_prefix << "':\n" << ex;
}
catch(...)
{
Error out(_pool->_instance->logger());
- out << "unknown exception in thread pool";
+ out << "unknown exception in `" << _pool->_prefix << "'";
+ }
+
+ //
+ // Promote a follower, but w/o modifying _inUse or creating new
+ // threads.
+ //
+ if(_pool->_sizeMax > 1)
+ {
+ _pool->_threadMutex.unlock();
}
- _pool->promoteFollower();
_pool = 0; // Break cyclic dependency.
}