diff options
author | Marc Laukien <marc@zeroc.com> | 2001-12-11 15:48:11 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2001-12-11 15:48:11 +0000 |
commit | c3253a00787b7c2dc84c81b269bc368ec2a2043f (patch) | |
tree | 747703a29c3cb487f556add7e00dd6b30866c3da /cpp/src/Ice/ThreadPool.cpp | |
parent | fixes (diff) | |
download | ice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.tar.bz2 ice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.tar.xz ice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.zip |
fixes
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 384 |
1 files changed, 190 insertions, 194 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 5ed4d3eaecc..a2bd4415d25 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -251,236 +251,232 @@ IceInternal::ThreadPool::run() { _threadMutex.lock(); - while (true) + repeatSelect: + + if (shutdown) // Shutdown has been initiated. + { + shutdown = false; + _instance->objectAdapterFactory()->shutdown(); + } + + fd_set fdSet; + memcpy(&fdSet, &_fdSet, sizeof(fd_set)); + int ret; + if (_timeout) { - if (shutdown) // Shutdown has been initiated. + struct timeval tv; + tv.tv_sec = _timeout; + tv.tv_usec = 0; + ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv); + } + else + { + ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0); + } + + if (ret == 0) // Timeout. + { + assert(_timeout); + _timeout = 0; + shutdown = true; + goto repeatSelect; + } + + if (ret == SOCKET_ERROR) + { + if (interrupted()) { - shutdown = false; - _instance->objectAdapterFactory()->shutdown(); + goto repeatSelect; } - fd_set fdSet; - memcpy(&fdSet, &_fdSet, sizeof(fd_set)); - int ret; - if (_timeout) - { - struct timeval tv; - tv.tv_sec = _timeout; - tv.tv_usec = 0; - ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv); - } - else - { - ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0); - } + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + + EventHandlerPtr handler; + + { + JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this); - if (ret == 0) // Timeout. + if (_destroyed) { - assert(_timeout); - _timeout = 0; - shutdown = true; - continue; + // + // Don't clear the interrupt fd if destroyed, so that + // the other threads exit as well. + // + return; } - - if (ret == SOCKET_ERROR) + + if (!_adds.empty()) { - if (interrupted()) + // + // New handlers have been added. + // + for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) { - continue; + _handlerMap.insert(*p); + FD_SET(p->first, &_fdSet); + _maxFd = max(_maxFd, p->first); + _minFd = min(_minFd, p->first); } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + _adds.clear(); } - EventHandlerPtr handler; - + if (!_removes.empty()) { - JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this); - - if (_destroyed) - { - // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. - // - return; - } - - if (!_adds.empty()) - { - // - // New handlers have been added. - // - for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) - { - _handlerMap.insert(*p); - FD_SET(p->first, &_fdSet); - _maxFd = max(_maxFd, p->first); - _minFd = min(_minFd, p->first); - } - _adds.clear(); - } - - if (!_removes.empty()) - { - // - // Handlers are permanently removed. - // - for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p) - { - map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first); - assert(q != _handlerMap.end()); - FD_CLR(p->first, &_fdSet); - if (p->second) // Call finished() on the handler? - { - q->second->finished(); - } - if (q->second->server()) - { - --_servers; - } - _handlerMap.erase(q); - } - _removes.clear(); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if (!_handlerMap.empty()) - { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); - } - if (_handlerMap.empty() || _servers == 0) - { - notifyAll(); // For waitUntil...Finished() methods. - } - - // - // Selected filedescriptors may have changed, I - // therefore need to repeat the select(). - // - shutdown = clearInterrupt(); - continue; - } - -// -// Optimization for WIN32 specific version of fd_set. Looping with a -// FD_ISSET test like for Unix is very unefficient for WIN32. -// -#ifdef WIN32 // - // Round robin for the filedescriptors. + // Handlers are permanently removed. // - if (fdSet.fd_count == 0) - { - ostringstream s; - s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - _instance->logger()->error(s.str()); - continue; - } - - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; - for (u_short i = 0; i < fdSet.fd_count; ++i) + for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p) { - SOCKET fd = fdSet.fd_array[i]; - assert(fd != INVALID_SOCKET); - - if (fd > _lastFd || _lastFd == INVALID_SOCKET) + map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first); + assert(q != _handlerMap.end()); + FD_CLR(p->first, &_fdSet); + if (p->second) // Call finished() on the handler? { - largerFd = min(largerFd, fd); + q->second->finished(); } - - smallestFd = min(smallestFd, fd); - } - - if (largerFd <= _maxFd) - { - assert(largerFd >= _minFd); - _lastFd = largerFd; - } - else - { - assert(smallestFd >= _minFd && smallestFd <= _maxFd); - _lastFd = smallestFd; - } -#else - // - // Round robin for the filedescriptors. - // - if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) - { - _lastFd = _minFd - 1; - } - - int loops = 0; - do - { - if (++_lastFd > _maxFd) + if (q->second->server()) { - ++loops; - _lastFd = _minFd; + --_servers; } + _handlerMap.erase(q); } - while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); - - if (loops > 1) + _removes.clear(); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if (!_handlerMap.empty()) { - ostringstream s; - s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - _instance->logger()->error(s.str()); - continue; + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); } -#endif - - if (_lastFd == _fdIntrRead) + if (_handlerMap.empty() || _servers == 0) { - shutdown = clearInterrupt(); - continue; + notifyAll(); // For waitUntil...Finished() methods. } - - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); - if(p == _handlerMap.end()) + + // + // Selected filedescriptors may have changed, I + // therefore need to repeat the select(). + // + shutdown = clearInterrupt(); + goto repeatSelect; + } + +// +// Optimization for WIN32 specific version of fd_set. Looping with a +// FD_ISSET test like for Unix is very unefficient for WIN32. +// +#ifdef WIN32 + // + // Round robin for the filedescriptors. + // + if (fdSet.fd_count == 0) + { + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + goto repeatSelect; + } + + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; + for (u_short i = 0; i < fdSet.fd_count; ++i) + { + SOCKET fd = fdSet.fd_array[i]; + assert(fd != INVALID_SOCKET); + + if (fd > _lastFd || _lastFd == INVALID_SOCKET) { - ostringstream s; - s << "filedescriptor " << _lastFd << " not registered with the thread pool"; - _instance->logger()->error(s.str()); - continue; + largerFd = min(largerFd, fd); } - - handler = p->second; + + smallestFd = min(smallestFd, fd); } - - assert(handler); + if (largerFd <= _maxFd) + { + assert(largerFd >= _minFd); + _lastFd = largerFd; + } + else + { + assert(smallestFd >= _minFd && smallestFd <= _maxFd); + _lastFd = smallestFd; + } +#else // - // If the handler is "readable", try to read a message. + // Round robin for the filedescriptors. // - BasicStream stream(_instance); - if (handler->readable()) + if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) { - try - { - read(handler); - } - catch (const TimeoutException&) // Expected. - { - continue; - } - catch (const LocalException& ex) + _lastFd = _minFd - 1; + } + + int loops = 0; + do + { + if (++_lastFd > _maxFd) { - handler->exception(ex); - continue; + ++loops; + _lastFd = _minFd; } - - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); } - - handler->message(stream); - break; + while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); + + if (loops > 1) + { + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + goto repeatSelect; + } +#endif + + if (_lastFd == _fdIntrRead) + { + shutdown = clearInterrupt(); + goto repeatSelect; + } + + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); + if(p == _handlerMap.end()) + { + ostringstream s; + s << "filedescriptor " << _lastFd << " not registered with the thread pool"; + _instance->logger()->error(s.str()); + goto repeatSelect; + } + + handler = p->second; + } + + // + // If the handler is "readable", try to read a message. + // + BasicStream stream(_instance); + if (handler->readable()) + { + try + { + read(handler); + } + catch (const TimeoutException&) // Expected. + { + goto repeatSelect; + } + catch (const LocalException& ex) + { + handler->exception(ex); + goto repeatSelect; + } + + stream.swap(handler->_stream); + assert(stream.i == stream.b.end()); } + + handler->message(stream); } } |