summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp115
1 files changed, 82 insertions, 33 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 592386142a6..be1ffb7372d 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -39,6 +39,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_size(0),
_sizeMax(0),
_sizeWarn(0),
+ _running(0),
_inUse(0),
_load(0)
{
@@ -77,6 +78,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
{
IceUtil::ThreadPtr thread = new EventHandlerThread(this);
_threads.push_back(thread->start());
+ ++_running;
}
}
catch(const IceUtil::Exception& ex)
@@ -102,7 +104,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
IceInternal::ThreadPool::~ThreadPool()
{
assert(_destroyed);
- assert(_inUse == 0);
closeSocket(_fdIntrWrite);
closeSocket(_fdIntrRead);
@@ -147,30 +148,32 @@ IceInternal::ThreadPool::promoteFollower()
{
IceUtil::Mutex::Lock sync(*this);
- assert(_inUse >= 0);
- ++_inUse;
-// _load = std::max(_load * 0.95 + _inUse * 0.05, static_cast<double>(_inUse)); // TODO: Configurable?
-// cout << "_load = " << _load << endl;
-
- if(_inUse == _sizeWarn)
+ if(!_destroyed)
{
- Warning out(_instance->logger());
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
- }
-
- assert(_inUse <= static_cast<int>(_threads.size()));
- if(!_destroyed && _inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
- {
- try
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
{
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- _threads.push_back(thread->start());
+ Warning out(_instance->logger());
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
}
- catch(const IceUtil::Exception& ex)
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
{
- Error out(_instance->logger());
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ try
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ _threads.push_back(thread->start());
+ ++_running;
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Error out(_instance->logger());
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
}
}
}
@@ -197,10 +200,13 @@ IceInternal::ThreadPool::joinWithAllThreads()
// threads would never terminate.)
//
assert(_destroyed);
+ for_each(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
+/*
for(vector<IceUtil::ThreadControl>::iterator p = _threads.begin(); p != _threads.end(); ++p)
{
p->join();
}
+*/
}
bool
@@ -271,7 +277,7 @@ repeat:
#endif
}
-void
+bool
IceInternal::ThreadPool::run()
{
ThreadPoolPtr self = this;
@@ -346,7 +352,7 @@ IceInternal::ThreadPool::run()
// Don't clear the interrupt if destroyed, so that
// the other threads exit as well.
//
- return;
+ return true;
}
shutdown = clearInterrupt();
@@ -550,9 +556,48 @@ IceInternal::ThreadPool::run()
{
{
IceUtil::Mutex::Lock sync(*this);
+
+ if(!_destroyed)
+ {
+ //
+ // First we reap threads that have been destroyed before.
+ //
+ int sz = static_cast<int>(_threads.size());
+ assert(_running <= sz);
+ if(_running < sz)
+ {
+ vector<IceUtil::ThreadControl>::iterator p =
+ partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
+ for_each(p, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
+ _threads.erase(p, _threads.end());
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+ const double loadFactor = 0.05; // TODO: Configurable?
+ const double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+
+ if(_running > _size)
+ {
+ int load = static_cast<int>(_load + 1);
+ if(load < _running)
+ {
+ assert(_inUse > 0);
+ --_inUse;
- assert(_inUse > 0);
- --_inUse;
+ assert(_running > 0);
+ --_running;
+
+ return false;
+ }
+ }
+
+ assert(_inUse > 0);
+ --_inUse;
+ }
}
_promoteMutex.lock();
@@ -651,27 +696,31 @@ IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPool
void
IceInternal::ThreadPool::EventHandlerThread::run()
{
+ bool promote;
+
try
{
- _pool->run();
+ promote = _pool->run();
}
catch(const Exception& ex)
{
Error out(_pool->_instance->logger());
out << "exception in `" << _pool->_prefix << "':\n" << ex;
+ promote = true;
}
catch(...)
{
Error out(_pool->_instance->logger());
- out << "unknown exception in `" << _pool->_prefix << "'";
- }
+ out << "unknown exception in `" << _pool->_prefix << "'";
+ promote = true;
+ }
- //
- // Promote a follower, but w/o modifying _inUse or creating new
- // threads.
- //
- if(_pool->_sizeMax > 1)
+ if(promote && _pool->_sizeMax > 1)
{
+ //
+ // Promote a follower, but w/o modifying _inUse or creating
+ // new threads.
+ //
_pool->_promoteMutex.unlock();
}