diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 246 |
1 files changed, 98 insertions, 148 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index f5d947d9117..d33e51fea03 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,8 +33,7 @@ void IceInternal::decRef(ThreadPool* p) { p->__decRef(); } void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ++_handlers; + IceUtil::Mutex::Lock sync(*this); _changes.push_back(make_pair(fd, handler)); setInterrupt(0); } @@ -42,7 +41,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) void IceInternal::ThreadPool::unregister(SOCKET fd) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Mutex::Lock sync(*this); _changes.push_back(make_pair(fd, EventHandlerPtr(0))); setInterrupt(0); } @@ -67,28 +66,6 @@ IceInternal::ThreadPool::initiateShutdown() } void -IceInternal::ThreadPool::waitUntilFinished() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_handlers != 0 && _threadNum != 0) - { - wait(); - } - - if(_handlers != 0) - { - Error out(_instance->logger()); - out << "can't wait for graceful application termination in thread pool\n" - << "since all threads have vanished"; - } - else - { - assert(_handlerMap.empty()); - } -} - -void IceInternal::ThreadPool::joinWithAllThreads() { // @@ -107,7 +84,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) : _instance(instance), _destroyed(false), _lastFd(INVALID_SOCKET), - _handlers(0), _timeout(0), _multipleThreads(false) { @@ -122,22 +98,23 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) : _maxFd = _fdIntrRead; _minFd = _fdIntrRead; + int threadNum; if(server) { _timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime"); - _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); + threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); } else { - _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); + threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); } - if(_threadNum < 1) + if(threadNum < 1) { - _threadNum = 1; + threadNum = 1; } - if(_threadNum > 1) + if(threadNum > 1) { _multipleThreads = true; } @@ -145,7 +122,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) : __setNoDelete(true); try { - for(int i = 0 ; i < _threadNum ; ++i) + for(int i = 0 ; i < threadNum ; ++i) { IceUtil::ThreadPtr thread = new EventHandlerThread(this); _threads.push_back(thread->start()); @@ -181,7 +158,7 @@ IceInternal::ThreadPool::~ThreadPool() void IceInternal::ThreadPool::destroy() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Mutex::Lock sync(*this); assert(!_destroyed); _destroyed = true; setInterrupt(0); @@ -259,33 +236,16 @@ void IceInternal::ThreadPool::run() { ThreadPoolPtr self = this; - bool shutdown = false; while(true) { if(_multipleThreads) { - _threadMutex.lock(); + _threadMutex.lock(); } repeatSelect: - // - // We must shut down the object adapter factory. We cannot do - // this in initiateShutdown(), because this method must be - // signal safe. We also cannot do this within the - // synchronization of this object, so we do it here. - // - if(shutdown) - { - shutdown = false; - ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory(); - if(factory) - { - factory->shutdown(); - } - } - fd_set fdSet; memcpy(&fdSet, &_fdSet, sizeof(fd_set)); int ret; @@ -301,11 +261,11 @@ IceInternal::ThreadPool::run() ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0); } - if(ret == 0) // Timeout. + if(ret == 0) // We initiate a shutdown if there is a thread pool timeout. { assert(_timeout); _timeout = 0; - shutdown = true; + initiateShutdown(); goto repeatSelect; } @@ -323,9 +283,10 @@ IceInternal::ThreadPool::run() EventHandlerPtr handler; bool finished = false; + bool shutdown = false; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Mutex::Lock sync(*this); if(FD_ISSET(_fdIntrRead, &fdSet)) { @@ -353,48 +314,43 @@ IceInternal::ThreadPool::run() shutdown = clearInterrupt(); - // - // Server shutdown? - // - if(shutdown) - { - goto repeatSelect; - } - - // - // An event handler must have been registered or - // unregistered. - // - assert(!_changes.empty()); - pair<SOCKET, EventHandlerPtr> change = _changes.front(); - _changes.pop_front(); - - if(change.second) // Addition if handler is set. + if(!shutdown) { - _handlerMap.insert(change); - FD_SET(change.first, &_fdSet); - _maxFd = max(_maxFd, change.first); - _minFd = min(_minFd, change.first); - goto repeatSelect; - } - else // Removal if handler is not set. - { - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); - assert(p != _handlerMap.end()); - handler = p->second; - finished = true; - _handlerMap.erase(p); - FD_CLR(change.first, &_fdSet); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if(!_handlerMap.empty()) + // + // An event handler must have been registered or + // unregistered. + // + assert(!_changes.empty()); + pair<SOCKET, EventHandlerPtr> change = _changes.front(); + _changes.pop_front(); + + if(change.second) // Addition if handler is set. + { + _handlerMap.insert(change); + FD_SET(change.first, &_fdSet); + _maxFd = max(_maxFd, change.first); + _minFd = min(_minFd, change.first); + goto repeatSelect; + } + else // Removal if handler is not set. { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); + assert(p != _handlerMap.end()); + handler = p->second; + finished = true; + _handlerMap.erase(p); + FD_CLR(change.first, &_fdSet); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if(!_handlerMap.empty()) + { + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); + } + // Don't goto repeatSelect; we have to call + // finished() on the event handler below, outside + // the thread synchronization. } - // Don't goto repeatSelect; we have to call - // finished() on the event handler below, outside - // the thread synchronization. } } else @@ -481,60 +437,73 @@ IceInternal::ThreadPool::run() } } - assert(handler); + assert(handler || shutdown); - if(finished) + if(shutdown) { // - // Notify a handler about it's removal from the thread - // pool. + // Initiate server shutdown. // - try - { - handler->finished(self); // "self" is faster than "this", as the reference count is not modified. - } - catch(const LocalException& ex) - { - Error out(_instance->logger()); - out << "exception while calling finished():\n" << ex << '\n' << handler->toString(); - } - + ObjectAdapterFactoryPtr factory = _instance->objectAdapterFactory(); + if(factory) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_handlers > 0); - if(--_handlers == 0) - { - notifyAll(); // For waitUntilFinished(). - } + promoteFollower(); + factory->shutdown(); } } else { - // - // If the handler is "readable", try to read a message. - // - BasicStream stream(_instance); - if(handler->readable()) + assert(handler); + + if(finished) { + // + // Notify a handler about it's removal from the thread + // pool. + // try { - read(handler); - } - catch(const TimeoutException&) // Expected. - { - goto repeatSelect; + handler->finished(self); // "self" is faster than "this", as the reference count is not modified. } catch(const LocalException& ex) { - handler->exception(ex); - goto repeatSelect; + Error out(_instance->logger()); + out << "exception while calling finished():\n" << ex << '\n' << handler->toString(); } - - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); } + else + { + // + // If the handler is "readable", try to read a + // message. + // + BasicStream stream(_instance); + if(handler->readable()) + { + try + { + read(handler); + } + catch(const TimeoutException&) // Expected. + { + goto repeatSelect; + } + catch(const LocalException& ex) + { + handler->exception(ex); + goto repeatSelect; + } + + stream.swap(handler->_stream); + assert(stream.i == stream.b.end()); + } - handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified. + // + // "self" is faster than "this", as the reference + // count is not modified. + // + handler->message(stream, self); + } } } } @@ -619,25 +588,6 @@ IceInternal::ThreadPool::EventHandlerThread::run() out << "unknown exception in thread pool"; } - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get()); - --_pool->_threadNum; - assert(_pool->_threadNum >= 0); - - // - // The notifyAll() shouldn't be needed, *except* if one of the - // threads exits because of an exception. (Which is an error - // condition in Ice and if it happens needs to be debugged.) - // However, I call notifyAll() anyway, in all cases, using a - // "defensive" programming approach when it comes to - // multithreading. - // - if(_pool->_threadNum == 0) - { - _pool->notifyAll(); // For waitUntil...Finished() methods. - } - } - _pool->promoteFollower(); _pool = 0; // Break cyclic dependency. } |