diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 262 |
1 files changed, 91 insertions, 171 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index c27f3f9864a..a5cd8a6de34 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -252,7 +252,6 @@ IceInternal::ThreadPool::run() _threadMutex.lock(); EventHandlerPtr handler; - bool reap; repeatSelect: @@ -308,62 +307,7 @@ IceInternal::ThreadPool::run() // return; } - - bool interrupt = false; - -// -// Optimization for WIN32 specific version of fd_set. Looping with a -// FD_ISSET test like for Unix is very unefficient for WIN32. -// -#ifdef WIN32 - // - // Round robin for the filedescriptors. The interrupt - // filedescriptor has priority. - // - assert(fdSet.fd_count > 0); - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; - for (u_short i = 0; i < fdSet.fd_count; ++i) - { - SOCKET fd = fdSet.fd_array[i]; - assert(fd != INVALID_SOCKET); - if (fd == _fdIntrRead) - { - shutdown = clearInterrupt(); - interrupt = true; - break; - } - - if (_lastFd == INVALID_SOCKET || fd > _lastFd) - { - largerFd = min(largerFd, fd); - } - - smallestFd = min(smallestFd, fd); - } - - if (!interrupt) - { - if (largerFd <= _maxFd) - { - assert(largerFd >= _minFd); - _lastFd = largerFd; - } - else - { - assert(smallestFd >= _minFd && smallestFd <= _maxFd); - _lastFd = smallestFd; - } - } -#else - if (FD_ISSET(_fdIntrRead, &fdSet)) - { - shutdown = clearInterrupt(); - interrupt = true; - } -#endif - if (!_adds.empty()) { // @@ -371,8 +315,7 @@ IceInternal::ThreadPool::run() // for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) { - _reapList.push_front(p->first); - _handlerMap[p->first] = make_pair(p->second, _reapList.begin()); + _handlerMap.insert(*p); FD_SET(p->first, &_fdSet); _maxFd = max(_maxFd, p->first); _minFd = min(_minFd, p->first); @@ -387,16 +330,14 @@ IceInternal::ThreadPool::run() // for (vector<SOCKET>::iterator p = _removes.begin(); p != _removes.end(); ++p) { - map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator q = _handlerMap.find(*p); + map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(*p); assert(q != _handlerMap.end()); FD_CLR(*p, &_fdSet); - q->second.first->finished(); - if (q->second.first->server()) + q->second->finished(); + if (q->second->server()) { --_servers; } - - _reapList.erase(q->second.second); _handlerMap.erase(q); } _removes.clear(); @@ -404,7 +345,7 @@ IceInternal::ThreadPool::run() _minFd = _fdIntrRead; if (!_handlerMap.empty()) { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _maxFd = max(_maxFd, _handlerMap.rbegin()->first); _minFd = min(_minFd, _handlerMap.begin()->first); } if (_handlerMap.empty() || _servers == 0) @@ -412,136 +353,120 @@ IceInternal::ThreadPool::run() notifyAll(); // For waitUntil...Finished() methods. } } - - if (interrupt) - { - goto repeatSelect; - } +// +// Optimization for WIN32 specific version of fd_set. Looping with a +// FD_ISSET test like for Unix is very unefficient for WIN32. +// +#ifdef WIN32 // - // Check if there are connections to reap. + // Round robin for the filedescriptors. // - reap = false; - // _handlerMap.size() is faster than _reapList() with most STLs. - if (_maxConnections > 0 && _handlerMap.size() > static_cast<list<SOCKET>::size_type>(_maxConnections)) + if (fdSet.fd_count == 0) { - for (list<SOCKET>::reverse_iterator p = _reapList.rbegin(); p != _reapList.rend(); ++p) - { - SOCKET fd = *p; - if (fd != -1) - { - _reapList.pop_back(); - _reapList.push_front(-1); - map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator q = _handlerMap.find(fd); - q->second.second = _reapList.begin(); - handler = q->second.first; - reap = true; - break; - } - } + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + goto repeatSelect; } - if (!reap) + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; + for (u_short i = 0; i < fdSet.fd_count; ++i) { -#ifndef WIN32 - // - // Round robin for the filedescriptors. - // - int loops = 0; + SOCKET fd = fdSet.fd_array[i]; + assert(fd != INVALID_SOCKET); - if (_lastFd < _minFd - 1) + if (fd > _lastFd || _lastFd == INVALID_SOCKET) { - _lastFd = _minFd - 1; + largerFd = min(largerFd, fd); } - do - { - if (++_lastFd > _maxFd) - { - ++loops; - _lastFd = _minFd; - } - } - while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); + smallestFd = min(smallestFd, fd); + } + + if (largerFd <= _maxFd) + { + assert(largerFd >= _minFd); + _lastFd = largerFd; + } + else + { + assert(smallestFd >= _minFd && smallestFd <= _maxFd); + _lastFd = smallestFd; + } +#else + // + // Round robin for the filedescriptors. + // + if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) + { + _lastFd = _minFd - 1; + } - if (loops > 1) + int loops = 0; + do + { + if (++_lastFd > _maxFd) { - ostringstream s; - s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - _instance->logger()->error(s.str()); - goto repeatSelect; + ++loops; + _lastFd = _minFd; } + } + while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); + + if (loops > 1) + { + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + goto repeatSelect; + } #endif - - map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator p = _handlerMap.find(_lastFd); - if(p == _handlerMap.end()) - { - ostringstream s; - s << "filedescriptor " << _lastFd << " not registered with the thread pool"; - _instance->logger()->error(s.str()); - goto repeatSelect; - } - - // - // Make the fd for the handler the most recently used one - // by moving it to the beginning of the the reap list. - // - if (p->second.second != _reapList.begin()) - { - _reapList.erase(p->second.second); - _reapList.push_front(p->first); - p->second.second = _reapList.begin(); - } - - handler = p->second.first; + + if (_lastFd == _fdIntrRead) + { + shutdown = clearInterrupt(); + goto repeatSelect; + } + + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); + if(p == _handlerMap.end()) + { + ostringstream s; + s << "filedescriptor " << _lastFd << " not registered with the thread pool"; + _instance->logger()->error(s.str()); + goto repeatSelect; } + + handler = p->second; } - if (reap) + // + // If the handler is "readable", try to read a message. + // + BasicStream stream(_instance); + if (handler->readable()) { - // - // Reap the handler. - // try { - if (!handler->tryDestroy()) - { - goto repeatSelect; - } + read(handler); } - catch (const LocalException&) + catch (const TimeoutException&) // Expected. { - // Ignore exeptions. + goto repeatSelect; } - } - else - { - // - // If the handler is "readable", try to read a message. - // - BasicStream stream(_instance); - if (handler->readable()) + catch (const LocalException& ex) { - try - { - read(handler); - } - catch (const TimeoutException&) // Expected. - { - goto repeatSelect; - } - catch (const LocalException& ex) - { - handler->exception(ex); - goto repeatSelect; - } - - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); + handler->exception(ex); + goto repeatSelect; } - handler->message(stream); + stream.swap(handler->_stream); + assert(stream.i == stream.b.end()); } + + handler->message(stream); } } @@ -600,11 +525,6 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) } void -IceInternal::ThreadPool::reapConnections() -{ -} - -void IceInternal::ThreadPool::EventHandlerThread::run() { try |