diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-10-31 15:07:59 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-10-31 15:07:59 +0000 |
commit | 60e5ad760f21d676fa37ff7026ce521ebbbe3bc9 (patch) | |
tree | fc399d968e3a5de6e263248b6f0784ea5743d5a2 /cpp/src/Ice/ThreadPool.cpp | |
parent | Fix (diff) | |
download | ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.bz2 ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.xz ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.zip |
Use poll() instead of select() on Unix.
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 164 |
1 files changed, 120 insertions, 44 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index abef235ca7d..f58467cd28d 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -56,11 +56,35 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _fdIntrRead = fds[0]; _fdIntrWrite = fds[1]; setBlock(_fdIntrRead, false); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; +#if defined(_WIN32) FD_ZERO(&_fdSet); FD_SET(_fdIntrRead, &_fdSet); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; +#elif defined(__linux) + _epollFd = epoll_create(1); + if(_epollFd < 0) + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + _events.resize(1); + epoll_event event; + event.events = EPOLLIN; + event.data.fd = _fdIntrRead; + if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0) + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } +#else + _pollFdSet.resize(1); + _pollFdSet[0].fd = _fdIntrRead; + _pollFdSet[0].events = POLLIN; +#endif // // We use just one thread as the default. This is the fastest @@ -160,6 +184,18 @@ IceInternal::ThreadPool::~ThreadPool() Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; } + +#ifdef __linux + try + { + closeSocket(_epollFd); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; + } +#endif } void @@ -343,9 +379,10 @@ IceInternal::ThreadPool::run() while(true) { + int ret; +#if defined(_WIN32) fd_set fdSet; memcpy(&fdSet, &_fdSet, sizeof(fd_set)); - int ret; if(_timeout > 0) { struct timeval tv; @@ -357,7 +394,12 @@ IceInternal::ThreadPool::run() { ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0); } - +#elif defined(__linux) + ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1); +#else + ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1); +#endif + if(ret == SOCKET_ERROR) { if(interrupted()) @@ -388,7 +430,23 @@ IceInternal::ThreadPool::run() } else { - if(FD_ISSET(_fdIntrRead, &fdSet)) + bool interrupted = false; +#if defined(_WIN32) + interrupted = FD_ISSET(_fdIntrRead, &fdSet); +#elif defined(__linux) + for(int i = 0; i < ret; ++i) + { + if(_events[i].data.fd == _fdIntrRead) + { + interrupted = true; + break; + } + } +#else + assert(_pollFdSet[0].fd == _fdIntrRead); + interrupted = _pollFdSet[0].revents != 0; +#endif + if(interrupted) { // // There are two possiblities for an interrupt: @@ -423,7 +481,26 @@ IceInternal::ThreadPool::run() if(change.second) // Addition if handler is set. { _handlerMap.insert(change); +#if defined(_WIN32) FD_SET(change.first, &_fdSet); +#elif defined(__linux) + epoll_event event; + event.events = EPOLLIN; + event.data.fd = change.first; + if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0) + { + Error out(_instance->initializationData().logger); + out << "error while adding filedescriptor to epoll set:\n"; + out << errorToString(getSocketErrno()); + continue; + } + _events.resize(_handlerMap.size() + 1); +#else + struct pollfd pollFd; + pollFd.fd = change.first; + pollFd.events = POLLIN; + _pollFdSet.push_back(pollFd); +#endif _maxFd = max(_maxFd, change.first); _minFd = min(_minFd, change.first); continue; @@ -435,7 +512,29 @@ IceInternal::ThreadPool::run() handler = p->second; finished = true; _handlerMap.erase(p); +#if defined(_WIN32) FD_CLR(change.first, &_fdSet); +#elif defined(__linux) + epoll_event event; + event.events = 0; + if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0) + { + Error out(_instance->initializationData().logger); + out << "error while adding filedescriptor to epoll set:\n"; + out << errorToString(getSocketErrno()); + continue; + } + _events.resize(_handlerMap.size() + 1); +#else + for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) + { + if(p->fd == change.first) + { + _pollFdSet.erase(p); + break; + } + } +#endif _maxFd = _fdIntrRead; _minFd = _fdIntrRead; if(!_handlerMap.empty()) @@ -450,14 +549,12 @@ IceInternal::ThreadPool::run() } else { -// -// Optimization for WIN32 specific version of fd_set. Looping with a -// FD_ISSET test like for Unix is very inefficient for WIN32. -// -#ifdef _WIN32 // // Round robin for the filedescriptors. // + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; +#if defined(_WIN32) if(fdSet.fd_count == 0) { Error out(_instance->initializationData().logger); @@ -465,14 +562,23 @@ IceInternal::ThreadPool::run() << " but no filedescriptor is readable"; continue; } - - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; for(u_short i = 0; i < fdSet.fd_count; ++i) { SOCKET fd = fdSet.fd_array[i]; +#elif defined(__linux) + for(int i = 0; i < ret; ++i) + { + SOCKET fd = _events[i].data.fd; +#else + for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) + { + if(p->revents == 0) + { + continue; + } + SOCKET fd = p->fd; +#endif assert(fd != INVALID_SOCKET); - if(fd > _lastFd || _lastFd == INVALID_SOCKET) { largerFd = min(largerFd, fd); @@ -491,37 +597,7 @@ IceInternal::ThreadPool::run() assert(smallestFd >= _minFd && smallestFd <= _maxFd); _lastFd = smallestFd; } -#else - // - // Round robin for the filedescriptors. - // - if(_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) - { - _lastFd = _minFd - 1; - } - - int loops = 0; - do - { - if(++_lastFd > _maxFd) - { - ++loops; - _lastFd = _minFd; - } - } - while(!FD_ISSET(_lastFd, &fdSet) && loops <= 1); - - if(loops > 1) - { - Error out(_instance->initializationData().logger); - out << "select() in `" << _prefix << "' returned " << ret - << " but no filedescriptor is readable"; - continue; - } -#endif - assert(_lastFd != _fdIntrRead); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) { |