summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp211
1 files changed, 132 insertions, 79 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 98812026aa7..3bce62e5bc7 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -43,6 +43,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_running(0),
_inUse(0),
_load(0),
+ _promote(true),
_warnUdp(_instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
{
SOCKET fds[2];
@@ -116,7 +117,7 @@ IceInternal::ThreadPool::~ThreadPool()
void
IceInternal::ThreadPool::destroy()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
assert(_handlerMap.empty());
assert(_changes.empty());
@@ -127,7 +128,7 @@ IceInternal::ThreadPool::destroy()
void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
@@ -136,7 +137,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
void
IceInternal::ThreadPool::unregister(SOCKET fd)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
setInterrupt(0);
@@ -147,37 +148,37 @@ IceInternal::ThreadPool::promoteFollower()
{
if(_sizeMax > 1)
{
- _promoteMutex.unlock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- {
- IceUtil::Mutex::Lock sync(*this);
+ assert(!_promote);
+ _promote = true;
+ notify();
- if(!_destroyed)
+ if(!_destroyed)
+ {
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
{
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
+ Warning out(_instance->logger());
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
+ {
+ try
{
- Warning out(_instance->logger());
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ _threads.push_back(thread->start());
+ ++_running;
}
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
+ catch(const IceUtil::Exception& ex)
{
- try
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- _threads.push_back(thread->start());
- ++_running;
- }
- catch(const IceUtil::Exception& ex)
- {
- Error out(_instance->logger());
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
- }
+ Error out(_instance->logger());
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
}
}
}
@@ -297,7 +298,14 @@ IceInternal::ThreadPool::run()
if(_sizeMax > 1)
{
- _promoteMutex.lock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
while(true)
@@ -344,7 +352,7 @@ IceInternal::ThreadPool::run()
bool shutdown = false;
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(FD_ISSET(_fdIntrRead, &fdSet))
{
@@ -514,6 +522,12 @@ IceInternal::ThreadPool::run()
promoteFollower();
factory->shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
}
else
{
@@ -527,7 +541,11 @@ IceInternal::ThreadPool::run()
//
try
{
- handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->finished(self);
}
catch(const LocalException& ex)
{
@@ -535,6 +553,13 @@ IceInternal::ThreadPool::run()
out << "exception in `" << _prefix << "' while calling finished():\n"
<< ex << '\n' << handler->toString();
}
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note that
+ // this means that finished() must call
+ // promoteFollower().
+ //
}
else
{
@@ -568,69 +593,92 @@ IceInternal::ThreadPool::run()
}
//
- // "self" is faster than "this", as the reference
- // count is not modified.
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->message(stream, self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->logger());
+ out << "exception in `" << _prefix << "' while calling finished():\n"
+ << ex << '\n' << handler->toString();
+ }
+
+ //
+ // No "continue", because we want message() to be
+ // called in its own thread from this pool. Note that
+ // this means that message() must call
+ // promoteFollower().
//
- handler->message(stream, self);
}
}
if(_sizeMax > 1)
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!_destroyed)
{
- IceUtil::Mutex::Lock sync(*this);
-
- if(!_destroyed)
+ //
+ // First we reap threads that have been destroyed before.
+ //
+ int sz = static_cast<int>(_threads.size());
+ assert(_running <= sz);
+ if(_running < sz)
{
- //
- // First we reap threads that have been destroyed before.
- //
- int sz = static_cast<int>(_threads.size());
- assert(_running <= sz);
- if(_running < sz)
- {
- vector<IceUtil::ThreadControl>::iterator start =
- partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
+ vector<IceUtil::ThreadControl>::iterator start =
+ partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
#if defined(_MSC_VER) && _MSC_VER <= 1200 // The mem_fun_ref below does not work with VC++ 6.0
- for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
- {
- p->join();
- }
+ for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
+ {
+ p->join();
+ }
#else
- for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
+ for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
#endif
- _threads.erase(start, _threads.end());
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
-
- if(_running > _size)
+ _threads.erase(start, _threads.end());
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+ const double loadFactor = 0.05; // TODO: Configurable?
+ const double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+
+ if(_running > _size)
+ {
+ int load = static_cast<int>(_load + 1);
+ if(load < _running)
{
- int load = static_cast<int>(_load + 1);
- if(load < _running)
- {
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
- }
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
- assert(_inUse > 0);
- --_inUse;
+ return false;
+ }
}
+
+ assert(_inUse > 0);
+ --_inUse;
}
- _promoteMutex.lock();
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
}
}
@@ -771,7 +819,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
// Promote a follower, but w/o modifying _inUse or creating
// new threads.
//
- _pool->_promoteMutex.unlock();
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
+ assert(!_pool->_promote);
+ _pool->_promote = true;
+ _pool->notify();
+ }
}
_pool = 0; // Break cyclic dependency.