diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 132 |
1 files changed, 29 insertions, 103 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index fcee3afb792..07dedbead26 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -30,14 +30,7 @@ void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (handler->server()) - { - ++_servers; - } - else - { - ++_clients; - } + ++_handlers; _changes.push_back(make_pair(fd, handler)); setInterrupt(0); } @@ -51,70 +44,30 @@ IceInternal::ThreadPool::unregister(SOCKET fd) } void -IceInternal::ThreadPool::serverIsNowClient() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ++_clients; - assert(_servers > 0); - --_servers; - if (_servers == 0) - { - notifyAll(); // For waitUntil...Finished() methods. - } -} - -void -IceInternal::ThreadPool::clientIsNowServer() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ++_servers; - assert(_clients > 0); - --_clients; -} - -void IceInternal::ThreadPool::promoteFollower() { _threadMutex.unlock(); } void -IceInternal::ThreadPool::initiateServerShutdown() +IceInternal::ThreadPool::initiateShutdown() { setInterrupt(1); } void -IceInternal::ThreadPool::waitUntilServerFinished() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while (_servers != 0 && _threadNum != 0) - { - wait(); - } - - if (_servers != 0) - { - Error out(_logger); - out << "can't wait for graceful server termination in thread pool\n" - << "since all threads have vanished"; - } -} - -void IceInternal::ThreadPool::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while (_clients + _servers != 0 && _threadNum != 0) + while (_handlers != 0 && _threadNum != 0) { wait(); } - if (_clients + _servers != 0) + if (_handlers != 0) { - Error out(_logger); + Error out(_instance->logger()); out << "can't wait for graceful application termination in thread pool\n" << "since all threads have vanished"; } @@ -139,35 +92,11 @@ IceInternal::ThreadPool::joinWithAllThreads() } } -void -IceInternal::ThreadPool::setMaxConnections(int maxConnections) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (maxConnections < _threadNum + 1 && maxConnections != 0) - { - _maxConnections = _threadNum + 1; - } - else - { - _maxConnections = maxConnections; - } -} - -int -IceInternal::ThreadPool::getMaxConnections() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _maxConnections; -} - -IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : +IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) : _instance(instance), - _logger(_instance->logger()), - _properties(_instance->properties()), _destroyed(false), _lastFd(INVALID_SOCKET), - _clients(0), - _servers(0), + _handlers(0), _timeout(0) { SOCKET fds[2]; @@ -181,8 +110,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : _maxFd = _fdIntrRead; _minFd = _fdIntrRead; - _timeout = _properties->getPropertyAsInt("Ice.ServerIdleTime"); - _threadNum = _properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10); + if (server) + { + _timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime"); + _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10); + } + else + { + _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 1); + } + if (_threadNum < 1) { _threadNum = 1; @@ -205,9 +142,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : destroy(); throw; } - - // Must be called after _threadNum is set. - setMaxConnections(_properties->getPropertyAsInt("Ice.ThreadPool.MaxConnections")); } IceInternal::ThreadPool::~ThreadPool() @@ -297,12 +231,13 @@ repeat: void IceInternal::ThreadPool::run() { + ThreadPoolPtr self = this; bool shutdown = false; while (true) { _threadMutex.lock(); - + repeatSelect: if (shutdown) // Shutdown has been initiated. @@ -438,7 +373,7 @@ IceInternal::ThreadPool::run() // if (fdSet.fd_count == 0) { - Error out(_logger); + Error out(_instance->logger()); out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; goto repeatSelect; } @@ -490,7 +425,7 @@ IceInternal::ThreadPool::run() if (loops > 1) { - Error out(_logger); + Error out(_instance->logger()); out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; goto repeatSelect; } @@ -501,7 +436,7 @@ IceInternal::ThreadPool::run() map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) { - Error out(_logger); + Error out(_instance->logger()); out << "filedescriptor " << _lastFd << " not registered with the thread pool"; goto repeatSelect; } @@ -518,23 +453,14 @@ IceInternal::ThreadPool::run() // Notify a handler about it's removal from the thread // pool. // - handler->finished(); + handler->finished(self); // "self" is faster than "this", as the reference count is not modified. { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (handler->server()) - { - assert(_servers > 0); - --_servers; - } - else - { - assert(_clients > 0); - --_clients; - } - if (_clients == 0 || _servers == 0) + assert(_handlers > 0); + if (--_handlers == 0) { - notifyAll(); // For waitUntil...Finished() methods. + notifyAll(); // For waitUntilFinished(). } } } @@ -564,7 +490,7 @@ IceInternal::ThreadPool::run() assert(stream.i == stream.b.end()); } - handler->message(stream); + handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified. } } } @@ -640,12 +566,12 @@ IceInternal::ThreadPool::EventHandlerThread::run() } catch (const Exception& ex) { - Error out(_pool->_logger); + Error out(_pool->_instance->logger()); out << "exception in thread pool:\n" << ex; } catch (...) { - Error out(_pool->_logger); + Error out(_pool->_instance->logger()); out << "unknown exception in thread pool"; } |