summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2002-12-14 22:19:03 +0000
committerMarc Laukien <marc@zeroc.com>2002-12-14 22:19:03 +0000
commita41bb120db0ccdea5eea58e26bd30eddaff40877 (patch)
tree955f720fb895739a61764d56234ac4a9595a1441 /cpp/src/Ice/ThreadPool.cpp
parentsome fixes (diff)
downloadice-a41bb120db0ccdea5eea58e26bd30eddaff40877.tar.bz2
ice-a41bb120db0ccdea5eea58e26bd30eddaff40877.tar.xz
ice-a41bb120db0ccdea5eea58e26bd30eddaff40877.zip
complete shutdown/deactivate re-implementation
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp246
1 files changed, 98 insertions, 148 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index f5d947d9117..d33e51fea03 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -33,8 +33,7 @@ void IceInternal::decRef(ThreadPool* p) { p->__decRef(); }
void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- ++_handlers;
+ IceUtil::Mutex::Lock sync(*this);
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
}
@@ -42,7 +41,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
void
IceInternal::ThreadPool::unregister(SOCKET fd)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Mutex::Lock sync(*this);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
setInterrupt(0);
}
@@ -67,28 +66,6 @@ IceInternal::ThreadPool::initiateShutdown()
}
void
-IceInternal::ThreadPool::waitUntilFinished()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_handlers != 0 && _threadNum != 0)
- {
- wait();
- }
-
- if(_handlers != 0)
- {
- Error out(_instance->logger());
- out << "can't wait for graceful application termination in thread pool\n"
- << "since all threads have vanished";
- }
- else
- {
- assert(_handlerMap.empty());
- }
-}
-
-void
IceInternal::ThreadPool::joinWithAllThreads()
{
//
@@ -107,7 +84,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) :
_instance(instance),
_destroyed(false),
_lastFd(INVALID_SOCKET),
- _handlers(0),
_timeout(0),
_multipleThreads(false)
{
@@ -122,22 +98,23 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) :
_maxFd = _fdIntrRead;
_minFd = _fdIntrRead;
+ int threadNum;
if(server)
{
_timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime");
- _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
+ threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
}
else
{
- _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
+ threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
}
- if(_threadNum < 1)
+ if(threadNum < 1)
{
- _threadNum = 1;
+ threadNum = 1;
}
- if(_threadNum > 1)
+ if(threadNum > 1)
{
_multipleThreads = true;
}
@@ -145,7 +122,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) :
__setNoDelete(true);
try
{
- for(int i = 0 ; i < _threadNum ; ++i)
+ for(int i = 0 ; i < threadNum ; ++i)
{
IceUtil::ThreadPtr thread = new EventHandlerThread(this);
_threads.push_back(thread->start());
@@ -181,7 +158,7 @@ IceInternal::ThreadPool::~ThreadPool()
void
IceInternal::ThreadPool::destroy()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Mutex::Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
setInterrupt(0);
@@ -259,33 +236,16 @@ void
IceInternal::ThreadPool::run()
{
ThreadPoolPtr self = this;
- bool shutdown = false;
while(true)
{
if(_multipleThreads)
{
- _threadMutex.lock();
+ _threadMutex.lock();
}
repeatSelect:
- //
- // We must shut down the object adapter factory. We cannot do
- // this in initiateShutdown(), because this method must be
- // signal safe. We also cannot do this within the
- // synchronization of this object, so we do it here.
- //
- if(shutdown)
- {
- shutdown = false;
- ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory();
- if(factory)
- {
- factory->shutdown();
- }
- }
-
fd_set fdSet;
memcpy(&fdSet, &_fdSet, sizeof(fd_set));
int ret;
@@ -301,11 +261,11 @@ IceInternal::ThreadPool::run()
ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
}
- if(ret == 0) // Timeout.
+ if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
{
assert(_timeout);
_timeout = 0;
- shutdown = true;
+ initiateShutdown();
goto repeatSelect;
}
@@ -323,9 +283,10 @@ IceInternal::ThreadPool::run()
EventHandlerPtr handler;
bool finished = false;
+ bool shutdown = false;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Mutex::Lock sync(*this);
if(FD_ISSET(_fdIntrRead, &fdSet))
{
@@ -353,48 +314,43 @@ IceInternal::ThreadPool::run()
shutdown = clearInterrupt();
- //
- // Server shutdown?
- //
- if(shutdown)
- {
- goto repeatSelect;
- }
-
- //
- // An event handler must have been registered or
- // unregistered.
- //
- assert(!_changes.empty());
- pair<SOCKET, EventHandlerPtr> change = _changes.front();
- _changes.pop_front();
-
- if(change.second) // Addition if handler is set.
+ if(!shutdown)
{
- _handlerMap.insert(change);
- FD_SET(change.first, &_fdSet);
- _maxFd = max(_maxFd, change.first);
- _minFd = min(_minFd, change.first);
- goto repeatSelect;
- }
- else // Removal if handler is not set.
- {
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
- assert(p != _handlerMap.end());
- handler = p->second;
- finished = true;
- _handlerMap.erase(p);
- FD_CLR(change.first, &_fdSet);
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if(!_handlerMap.empty())
+ //
+ // An event handler must have been registered or
+ // unregistered.
+ //
+ assert(!_changes.empty());
+ pair<SOCKET, EventHandlerPtr> change = _changes.front();
+ _changes.pop_front();
+
+ if(change.second) // Addition if handler is set.
+ {
+ _handlerMap.insert(change);
+ FD_SET(change.first, &_fdSet);
+ _maxFd = max(_maxFd, change.first);
+ _minFd = min(_minFd, change.first);
+ goto repeatSelect;
+ }
+ else // Removal if handler is not set.
{
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
+ assert(p != _handlerMap.end());
+ handler = p->second;
+ finished = true;
+ _handlerMap.erase(p);
+ FD_CLR(change.first, &_fdSet);
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if(!_handlerMap.empty())
+ {
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ // Don't goto repeatSelect; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
}
- // Don't goto repeatSelect; we have to call
- // finished() on the event handler below, outside
- // the thread synchronization.
}
}
else
@@ -481,60 +437,73 @@ IceInternal::ThreadPool::run()
}
}
- assert(handler);
+ assert(handler || shutdown);
- if(finished)
+ if(shutdown)
{
//
- // Notify a handler about it's removal from the thread
- // pool.
+ // Initiate server shutdown.
//
- try
- {
- handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->logger());
- out << "exception while calling finished():\n" << ex << '\n' << handler->toString();
- }
-
+ ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory();
+ if(factory)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_handlers > 0);
- if(--_handlers == 0)
- {
- notifyAll(); // For waitUntilFinished().
- }
+ promoteFollower();
+ factory->shutdown();
}
}
else
{
- //
- // If the handler is "readable", try to read a message.
- //
- BasicStream stream(_instance);
- if(handler->readable())
+ assert(handler);
+
+ if(finished)
{
+ //
+ // Notify a handler about it's removal from the thread
+ // pool.
+ //
try
{
- read(handler);
- }
- catch(const TimeoutException&) // Expected.
- {
- goto repeatSelect;
+ handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
}
catch(const LocalException& ex)
{
- handler->exception(ex);
- goto repeatSelect;
+ Error out(_instance->logger());
+ out << "exception while calling finished():\n" << ex << '\n' << handler->toString();
}
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
}
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ BasicStream stream(_instance);
+ if(handler->readable())
+ {
+ try
+ {
+ read(handler);
+ }
+ catch(const TimeoutException&) // Expected.
+ {
+ goto repeatSelect;
+ }
+ catch(const LocalException& ex)
+ {
+ handler->exception(ex);
+ goto repeatSelect;
+ }
+
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
+ }
- handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified.
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->message(stream, self);
+ }
}
}
}
@@ -619,25 +588,6 @@ IceInternal::ThreadPool::EventHandlerThread::run()
out << "unknown exception in thread pool";
}
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
- --_pool->_threadNum;
- assert(_pool->_threadNum >= 0);
-
- //
- // The notifyAll() shouldn't be needed, *except* if one of the
- // threads exits because of an exception. (Which is an error
- // condition in Ice and if it happens needs to be debugged.)
- // However, I call notifyAll() anyway, in all cases, using a
- // "defensive" programming approach when it comes to
- // multithreading.
- //
- if(_pool->_threadNum == 0)
- {
- _pool->notifyAll(); // For waitUntil...Finished() methods.
- }
- }
-
_pool->promoteFollower();
_pool = 0; // Break cyclic dependency.
}