diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 252 |
1 files changed, 133 insertions, 119 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 5c8caa70197..82ceade9ff7 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -288,7 +288,8 @@ IceInternal::ThreadPool::run() } EventHandlerPtr handler; - + std::pair<SOCKET, bool> remove(INVALID_SOCKET, false); + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -321,152 +322,165 @@ IceInternal::ThreadPool::run() // // Handlers are permanently removed. // - for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p) + remove = _removes.front(); + _removes.pop_front(); + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); + assert(p != _handlerMap.end()); + FD_CLR(p->first, &_fdSet); + handler = p->second; + } + + if (!handler) + { +// +// Optimization for WIN32 specific version of fd_set. Looping with a +// FD_ISSET test like for Unix is very unefficient for WIN32. +// +#ifdef WIN32 + // + // Round robin for the filedescriptors. + // + if (fdSet.fd_count == 0) { - map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first); - assert(q != _handlerMap.end()); - FD_CLR(p->first, &_fdSet); - if (p->second) // Call finished() on the handler? - { - q->second->finished(); - } - if (q->second->server()) + Error out(_logger); + out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + goto repeatSelect; + } + + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; + for (u_short i = 0; i < fdSet.fd_count; ++i) + { + SOCKET fd = fdSet.fd_array[i]; + assert(fd != INVALID_SOCKET); + + if (fd > _lastFd || _lastFd == INVALID_SOCKET) { - --_servers; + largerFd = min(largerFd, fd); } - _handlerMap.erase(q); + + smallestFd = min(smallestFd, fd); } - _removes.clear(); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if (!_handlerMap.empty()) + + if (largerFd <= _maxFd) { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); + assert(largerFd >= _minFd); + _lastFd = largerFd; } - if (_handlerMap.empty() || _servers == 0) + else { - notifyAll(); // For waitUntil...Finished() methods. + assert(smallestFd >= _minFd && smallestFd <= _maxFd); + _lastFd = smallestFd; } - +#else // - // Selected filedescriptors may have changed, I - // therefore need to repeat the select(). + // Round robin for the filedescriptors. // - shutdown = clearInterrupt(); - goto repeatSelect; - } - -// -// Optimization for WIN32 specific version of fd_set. Looping with a -// FD_ISSET test like for Unix is very unefficient for WIN32. -// -#ifdef WIN32 - // - // Round robin for the filedescriptors. - // - if (fdSet.fd_count == 0) - { - Error out(_logger); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; - } - - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; - for (u_short i = 0; i < fdSet.fd_count; ++i) - { - SOCKET fd = fdSet.fd_array[i]; - assert(fd != INVALID_SOCKET); - - if (fd > _lastFd || _lastFd == INVALID_SOCKET) + if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) { - largerFd = min(largerFd, fd); + _lastFd = _minFd - 1; } - - smallestFd = min(smallestFd, fd); + + int loops = 0; + do + { + if (++_lastFd > _maxFd) + { + ++loops; + _lastFd = _minFd; + } + } + while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); + + if (loops > 1) + { + Error out(_logger); + out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + goto repeatSelect; + } +#endif + + if (_lastFd == _fdIntrRead) + { + shutdown = clearInterrupt(); + goto repeatSelect; + } + + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); + if(p == _handlerMap.end()) + { + Error out(_logger); + out << "filedescriptor " << _lastFd << " not registered with the thread pool"; + goto repeatSelect; + } + + handler = p->second; } + } + + assert(handler); - if (largerFd <= _maxFd) - { - assert(largerFd >= _minFd); - _lastFd = largerFd; - } - else - { - assert(smallestFd >= _minFd && smallestFd <= _maxFd); - _lastFd = smallestFd; - } -#else + if (remove.first != INVALID_SOCKET) + { // - // Round robin for the filedescriptors. + // Call finished() on a handler if necessary. // - if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) + if (remove.second) { - _lastFd = _minFd - 1; + handler->finished(); } - - int loops = 0; - do + { - if (++_lastFd > _maxFd) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); + assert(p != _handlerMap.end()); + _handlerMap.erase(p); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if (!_handlerMap.empty()) { - ++loops; - _lastFd = _minFd; + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); + } + if (handler->server()) + { + --_servers; + } + if (_handlerMap.empty() || _servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. } } - while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); - - if (loops > 1) - { - Error out(_logger); - out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - goto repeatSelect; - } -#endif - - if (_lastFd == _fdIntrRead) - { - shutdown = clearInterrupt(); - goto repeatSelect; - } - - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); - if(p == _handlerMap.end()) - { - Error out(_logger); - out << "filedescriptor " << _lastFd << " not registered with the thread pool"; - goto repeatSelect; - } - - handler = p->second; } - - // - // If the handler is "readable", try to read a message. - // - BasicStream stream(_instance); - if (handler->readable()) + else { - try - { - read(handler); - } - catch (const TimeoutException&) // Expected. - { - goto repeatSelect; - } - catch (const LocalException& ex) + // + // If the handler is "readable", try to read a message. + // + BasicStream stream(_instance); + if (handler->readable()) { - handler->exception(ex); - goto repeatSelect; + try + { + read(handler); + } + catch (const TimeoutException&) // Expected. + { + goto repeatSelect; + } + catch (const LocalException& ex) + { + handler->exception(ex); + goto repeatSelect; + } + + stream.swap(handler->_stream); + assert(stream.i == stream.b.end()); } - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); + handler->message(stream); } - - handler->message(stream); } } |