diff options
author | Marc Laukien <marc@zeroc.com> | 2001-12-11 04:17:39 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2001-12-11 04:17:39 +0000 |
commit | cfeacfd608794bda150805b318ebff315abb37fd (patch) | |
tree | df54356e9862b1f7f71e7be7b7255feb07819ba1 /cpp/src | |
parent | fix (diff) | |
download | ice-cfeacfd608794bda150805b318ebff315abb37fd.tar.bz2 ice-cfeacfd608794bda150805b318ebff315abb37fd.tar.xz ice-cfeacfd608794bda150805b318ebff315abb37fd.zip |
fixes
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 384 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 2 |
2 files changed, 195 insertions, 191 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 936f96907af..5ed4d3eaecc 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -251,232 +251,236 @@ IceInternal::ThreadPool::run() { _threadMutex.lock(); - EventHandlerPtr handler; - - repeatSelect: - - if (shutdown) // Shutdown has been initiated. - { - shutdown = false; - _instance->objectAdapterFactory()->shutdown(); - } - - fd_set fdSet; - memcpy(&fdSet, &_fdSet, sizeof(fd_set)); - int ret; - if (_timeout) - { - struct timeval tv; - tv.tv_sec = _timeout; - tv.tv_usec = 0; - ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv); - } - else + while (true) { - ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0); - } - - if (ret == 0) // Timeout. - { - assert(_timeout); - _timeout = 0; - shutdown = true; - goto repeatSelect; - } - - if (ret == SOCKET_ERROR) - { - if (interrupted()) + if (shutdown) // Shutdown has been initiated. { - goto repeatSelect; + shutdown = false; + _instance->objectAdapterFactory()->shutdown(); } - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - { - JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this); + fd_set fdSet; + memcpy(&fdSet, &_fdSet, sizeof(fd_set)); + int ret; + if (_timeout) + { + struct timeval tv; + tv.tv_sec = _timeout; + tv.tv_usec = 0; + ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv); + } + else + { + ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0); + } - if (_destroyed) + if (ret == 0) // Timeout. { - // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. - // - return; + assert(_timeout); + _timeout = 0; + shutdown = true; + continue; } - - if (!_adds.empty()) + + if (ret == SOCKET_ERROR) { - // - // New handlers have been added. - // - for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) + if (interrupted()) { - _handlerMap.insert(*p); - FD_SET(p->first, &_fdSet); - _maxFd = max(_maxFd, p->first); - _minFd = min(_minFd, p->first); + continue; } - _adds.clear(); + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } - if (!_removes.empty()) + EventHandlerPtr handler; + { - // - // Handlers are permanently removed. - // - for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p) + JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this); + + if (_destroyed) { - map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first); - assert(q != _handlerMap.end()); - FD_CLR(p->first, &_fdSet); - if (p->second) // Call finished() on the handler? - { - q->second->finished(); - } - if (q->second->server()) - { - --_servers; - } - _handlerMap.erase(q); + // + // Don't clear the interrupt fd if destroyed, so that + // the other threads exit as well. + // + return; } - _removes.clear(); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if (!_handlerMap.empty()) + + if (!_adds.empty()) { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); + // + // New handlers have been added. + // + for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) + { + _handlerMap.insert(*p); + FD_SET(p->first, &_fdSet); + _maxFd = max(_maxFd, p->first); + _minFd = min(_minFd, p->first); + } + _adds.clear(); } - if (_handlerMap.empty() || _servers == 0) + + if (!_removes.empty()) { - notifyAll(); // For waitUntil...Finished() methods. + // + // Handlers are permanently removed. + // + for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p) + { + map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first); + assert(q != _handlerMap.end()); + FD_CLR(p->first, &_fdSet); + if (p->second) // Call finished() on the handler? + { + q->second->finished(); + } + if (q->second->server()) + { + --_servers; + } + _handlerMap.erase(q); + } + _removes.clear(); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if (!_handlerMap.empty()) + { + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); + } + if (_handlerMap.empty() || _servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. + } + + // + // Selected filedescriptors may have changed, I + // therefore need to repeat the select(). + // + shutdown = clearInterrupt(); + continue; } - - // - // Selected filedescriptors may have changed, I - // therefore need to repeat the select(). - // - shutdown = clearInterrupt(); - goto repeatSelect; - } - + // // Optimization for WIN32 specific version of fd_set. Looping with a // FD_ISSET test like for Unix is very unefficient for WIN32. // #ifdef WIN32 - // - // Round robin for the filedescriptors. - // - if (fdSet.fd_count == 0) - { - ostringstream s; - s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - _instance->logger()->error(s.str()); - goto repeatSelect; - } - - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; - for (u_short i = 0; i < fdSet.fd_count; ++i) - { - SOCKET fd = fdSet.fd_array[i]; - assert(fd != INVALID_SOCKET); - - if (fd > _lastFd || _lastFd == INVALID_SOCKET) + // + // Round robin for the filedescriptors. + // + if (fdSet.fd_count == 0) { - largerFd = min(largerFd, fd); + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + continue; + } + + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; + for (u_short i = 0; i < fdSet.fd_count; ++i) + { + SOCKET fd = fdSet.fd_array[i]; + assert(fd != INVALID_SOCKET); + + if (fd > _lastFd || _lastFd == INVALID_SOCKET) + { + largerFd = min(largerFd, fd); + } + + smallestFd = min(smallestFd, fd); + } + + if (largerFd <= _maxFd) + { + assert(largerFd >= _minFd); + _lastFd = largerFd; + } + else + { + assert(smallestFd >= _minFd && smallestFd <= _maxFd); + _lastFd = smallestFd; } - - smallestFd = min(smallestFd, fd); - } - - if (largerFd <= _maxFd) - { - assert(largerFd >= _minFd); - _lastFd = largerFd; - } - else - { - assert(smallestFd >= _minFd && smallestFd <= _maxFd); - _lastFd = smallestFd; - } #else - // - // Round robin for the filedescriptors. - // - if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) - { - _lastFd = _minFd - 1; - } - - int loops = 0; - do - { - if (++_lastFd > _maxFd) + // + // Round robin for the filedescriptors. + // + if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) { - ++loops; - _lastFd = _minFd; + _lastFd = _minFd - 1; + } + + int loops = 0; + do + { + if (++_lastFd > _maxFd) + { + ++loops; + _lastFd = _minFd; + } + } + while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); + + if (loops > 1) + { + ostringstream s; + s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; + _instance->logger()->error(s.str()); + continue; } - } - while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1); - - if (loops > 1) - { - ostringstream s; - s << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; - _instance->logger()->error(s.str()); - goto repeatSelect; - } #endif - - if (_lastFd == _fdIntrRead) - { - shutdown = clearInterrupt(); - goto repeatSelect; - } - - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); - if(p == _handlerMap.end()) - { - ostringstream s; - s << "filedescriptor " << _lastFd << " not registered with the thread pool"; - _instance->logger()->error(s.str()); - goto repeatSelect; + + if (_lastFd == _fdIntrRead) + { + shutdown = clearInterrupt(); + continue; + } + + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); + if(p == _handlerMap.end()) + { + ostringstream s; + s << "filedescriptor " << _lastFd << " not registered with the thread pool"; + _instance->logger()->error(s.str()); + continue; + } + + handler = p->second; } - handler = p->second; - } + assert(handler); - // - // If the handler is "readable", try to read a message. - // - BasicStream stream(_instance); - if (handler->readable()) - { - try - { - read(handler); - } - catch (const TimeoutException&) // Expected. - { - goto repeatSelect; - } - catch (const LocalException& ex) + // + // If the handler is "readable", try to read a message. + // + BasicStream stream(_instance); + if (handler->readable()) { - handler->exception(ex); - goto repeatSelect; + try + { + read(handler); + } + catch (const TimeoutException&) // Expected. + { + continue; + } + catch (const LocalException& ex) + { + handler->exception(ex); + continue; + } + + stream.swap(handler->_stream); + assert(stream.i == stream.b.end()); } - - stream.swap(handler->_stream); - assert(stream.i == stream.b.end()); - } - handler->message(stream); + handler->message(stream); + break; + } } } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 7a5932dc813..2c90624a865 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -72,7 +72,7 @@ private: { public: - EventHandlerThread(ThreadPoolPtr pool) : _pool(pool) { } + EventHandlerThread(const ThreadPoolPtr& pool) : _pool(pool) { } virtual void run(); private: |