diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-10-31 15:07:59 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-10-31 15:07:59 +0000 |
commit | 60e5ad760f21d676fa37ff7026ce521ebbbe3bc9 (patch) | |
tree | fc399d968e3a5de6e263248b6f0784ea5743d5a2 /cpp/src | |
parent | Fix (diff) | |
download | ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.bz2 ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.xz ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.zip |
Use poll() instead of select() on Unix.
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Network.cpp | 36 | ||||
-rw-r--r-- | cpp/src/Ice/Network.h | 8 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 16 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 164 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 17 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 19 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.h | 2 | ||||
-rw-r--r-- | cpp/src/IceSSL/Util.cpp | 10 |
8 files changed, 193 insertions, 79 deletions
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index bc917c4299c..8bd57de3d83 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -601,28 +601,11 @@ repeatConnect: assert(nevents.lNetworkEvents & FD_CONNECT); val = nevents.iErrorCode[FD_CONNECT_BIT]; #else - repeatSelect: - int ret; - fd_set wFdSet; - FD_ZERO(&wFdSet); - FD_SET(fd, &wFdSet); - // - // Note that although we use a different mechanism for - // WIN32, winsock notifies about connection failures - // through the exception filedescriptors - // - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - ret = ::select(fd + 1, 0, &wFdSet, 0, &tv); - } - else - { - ret = ::select(fd + 1, 0, &wFdSet, 0, 0); - } - + repeatPoll: + struct pollfd pollFd[1]; + pollFd[0].fd = fd; + pollFd[0].events = POLLOUT; + int ret = ::poll(pollFd, 1, timeout); if(ret == 0) { closeSocketNoThrow(fd); @@ -632,7 +615,7 @@ repeatConnect: { if(interrupted()) { - goto repeatSelect; + goto repeatPoll; } SocketException ex(__FILE__, __LINE__); @@ -730,6 +713,7 @@ repeatAccept: { repeatSelect: int rs; +#ifdef _WIN32 fd_set fdSet; FD_ZERO(&fdSet); FD_SET(fd, &fdSet); @@ -744,6 +728,12 @@ repeatAccept: { rs = ::select(static_cast<int>(fd + 1), &fdSet, 0, 0, 0); } +#else + struct pollfd pollFd[1]; + pollFd[0].fd = fd; + pollFd[0].events = POLLIN; + rs = ::poll(pollFd, 1, timeout); +#endif if(rs == SOCKET_ERROR) { diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index b5cf02d8f80..ff38b2e4963 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -23,13 +23,7 @@ typedef int ssize_t; # include <unistd.h> # include <fcntl.h> # include <sys/socket.h> - -# if defined(__hpux) -# include <sys/time.h> -# else -# include <sys/select.h> -# endif - +# include <sys/poll.h> # include <netinet/in.h> # include <netinet/tcp.h> # include <arpa/inet.h> diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index 69cfafa432a..1a06809bb1a 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -122,6 +122,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf, int timeout) int rs; assert(_fd != INVALID_SOCKET); +#ifdef _WIN32 FD_SET(_fd, &_wFdSet); if(timeout >= 0) @@ -135,7 +136,12 @@ IceInternal::TcpTransceiver::write(Buffer& buf, int timeout) { rs = ::select(static_cast<int>(_fd + 1), 0, &_wFdSet, 0, 0); } - +#else + struct pollfd pollFd[1]; + pollFd[0].fd = _fd; + pollFd[0].events = POLLOUT; + rs = ::poll(pollFd, 1, timeout); +#endif if(rs == SOCKET_ERROR) { if(interrupted()) @@ -238,6 +244,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf, int timeout) int rs; assert(_fd != INVALID_SOCKET); +#ifdef _WIN32 FD_SET(_fd, &_rFdSet); if(timeout >= 0) @@ -251,7 +258,12 @@ IceInternal::TcpTransceiver::read(Buffer& buf, int timeout) { rs = ::select(static_cast<int>(_fd + 1), &_rFdSet, 0, 0, 0); } - +#else + struct pollfd pollFd[1]; + pollFd[0].fd = _fd; + pollFd[0].events = POLLIN; + rs = ::poll(pollFd, 1, timeout); +#endif if(rs == SOCKET_ERROR) { if(interrupted()) diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index abef235ca7d..f58467cd28d 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -56,11 +56,35 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _fdIntrRead = fds[0]; _fdIntrWrite = fds[1]; setBlock(_fdIntrRead, false); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; +#if defined(_WIN32) FD_ZERO(&_fdSet); FD_SET(_fdIntrRead, &_fdSet); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; +#elif defined(__linux) + _epollFd = epoll_create(1); + if(_epollFd < 0) + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + _events.resize(1); + epoll_event event; + event.events = EPOLLIN; + event.data.fd = _fdIntrRead; + if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0) + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } +#else + _pollFdSet.resize(1); + _pollFdSet[0].fd = _fdIntrRead; + _pollFdSet[0].events = POLLIN; +#endif // // We use just one thread as the default. This is the fastest @@ -160,6 +184,18 @@ IceInternal::ThreadPool::~ThreadPool() Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; } + +#ifdef __linux + try + { + closeSocket(_epollFd); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; + } +#endif } void @@ -343,9 +379,10 @@ IceInternal::ThreadPool::run() while(true) { + int ret; +#if defined(_WIN32) fd_set fdSet; memcpy(&fdSet, &_fdSet, sizeof(fd_set)); - int ret; if(_timeout > 0) { struct timeval tv; @@ -357,7 +394,12 @@ IceInternal::ThreadPool::run() { ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0); } - +#elif defined(__linux) + ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1); +#else + ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1); +#endif + if(ret == SOCKET_ERROR) { if(interrupted()) @@ -388,7 +430,23 @@ IceInternal::ThreadPool::run() } else { - if(FD_ISSET(_fdIntrRead, &fdSet)) + bool interrupted = false; +#if defined(_WIN32) + interrupted = FD_ISSET(_fdIntrRead, &fdSet); +#elif defined(__linux) + for(int i = 0; i < ret; ++i) + { + if(_events[i].data.fd == _fdIntrRead) + { + interrupted = true; + break; + } + } +#else + assert(_pollFdSet[0].fd == _fdIntrRead); + interrupted = _pollFdSet[0].revents != 0; +#endif + if(interrupted) { // // There are two possiblities for an interrupt: @@ -423,7 +481,26 @@ IceInternal::ThreadPool::run() if(change.second) // Addition if handler is set. { _handlerMap.insert(change); +#if defined(_WIN32) FD_SET(change.first, &_fdSet); +#elif defined(__linux) + epoll_event event; + event.events = EPOLLIN; + event.data.fd = change.first; + if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0) + { + Error out(_instance->initializationData().logger); + out << "error while adding filedescriptor to epoll set:\n"; + out << errorToString(getSocketErrno()); + continue; + } + _events.resize(_handlerMap.size() + 1); +#else + struct pollfd pollFd; + pollFd.fd = change.first; + pollFd.events = POLLIN; + _pollFdSet.push_back(pollFd); +#endif _maxFd = max(_maxFd, change.first); _minFd = min(_minFd, change.first); continue; @@ -435,7 +512,29 @@ IceInternal::ThreadPool::run() handler = p->second; finished = true; _handlerMap.erase(p); +#if defined(_WIN32) FD_CLR(change.first, &_fdSet); +#elif defined(__linux) + epoll_event event; + event.events = 0; + if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0) + { + Error out(_instance->initializationData().logger); + out << "error while adding filedescriptor to epoll set:\n"; + out << errorToString(getSocketErrno()); + continue; + } + _events.resize(_handlerMap.size() + 1); +#else + for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) + { + if(p->fd == change.first) + { + _pollFdSet.erase(p); + break; + } + } +#endif _maxFd = _fdIntrRead; _minFd = _fdIntrRead; if(!_handlerMap.empty()) @@ -450,14 +549,12 @@ IceInternal::ThreadPool::run() } else { -// -// Optimization for WIN32 specific version of fd_set. Looping with a -// FD_ISSET test like for Unix is very inefficient for WIN32. -// -#ifdef _WIN32 // // Round robin for the filedescriptors. // + SOCKET largerFd = _maxFd + 1; + SOCKET smallestFd = _maxFd + 1; +#if defined(_WIN32) if(fdSet.fd_count == 0) { Error out(_instance->initializationData().logger); @@ -465,14 +562,23 @@ IceInternal::ThreadPool::run() << " but no filedescriptor is readable"; continue; } - - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; for(u_short i = 0; i < fdSet.fd_count; ++i) { SOCKET fd = fdSet.fd_array[i]; +#elif defined(__linux) + for(int i = 0; i < ret; ++i) + { + SOCKET fd = _events[i].data.fd; +#else + for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) + { + if(p->revents == 0) + { + continue; + } + SOCKET fd = p->fd; +#endif assert(fd != INVALID_SOCKET); - if(fd > _lastFd || _lastFd == INVALID_SOCKET) { largerFd = min(largerFd, fd); @@ -491,37 +597,7 @@ IceInternal::ThreadPool::run() assert(smallestFd >= _minFd && smallestFd <= _maxFd); _lastFd = smallestFd; } -#else - // - // Round robin for the filedescriptors. - // - if(_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET) - { - _lastFd = _minFd - 1; - } - - int loops = 0; - do - { - if(++_lastFd > _maxFd) - { - ++loops; - _lastFd = _minFd; - } - } - while(!FD_ISSET(_lastFd, &fdSet) && loops <= 1); - - if(loops > 1) - { - Error out(_instance->initializationData().logger); - out << "select() in `" << _prefix << "' returned " << ret - << " but no filedescriptor is readable"; - continue; - } -#endif - assert(_lastFd != _fdIntrRead); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) { diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 81bc26b6418..4914b1b0fa5 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -22,12 +22,18 @@ #include <Ice/EventHandlerF.h> #include <list> -#ifdef _WIN32
-# include <winsock2.h>
+#ifdef _WIN32 +# include <winsock2.h> #else # define SOCKET int +# ifdef __linux +# include <sys/epoll.h> +# else +# include <sys/poll.h> +# endif #endif + namespace IceInternal { @@ -66,7 +72,14 @@ private: SOCKET _lastFd; SOCKET _fdIntrRead; SOCKET _fdIntrWrite; +#if defined(_WIN32) fd_set _fdSet; +#elif defined(__linux) + int _epollFd; + std::vector<struct epoll_event> _events; +#else + std::vector<struct pollfd> _pollFdSet; +#endif std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal. diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index f3d6daf331a..aa38f4ffe83 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -141,9 +141,15 @@ repeat: repeatSelect: assert(_fd != INVALID_SOCKET); +#ifdef _WIN32 FD_SET(_fd, &_wFdSet); int rs = ::select(static_cast<int>(_fd + 1), 0, &_wFdSet, 0, 0); - +#else + struct pollfd fdSet[1]; + fdSet[0].fd = _fd; + fdSet[0].events = POLLOUT; + int rs = ::poll(fdSet, 1, -1); +#endif if(rs == SOCKET_ERROR) { if(interrupted()) @@ -257,8 +263,15 @@ repeat: repeatSelect: assert(_fd != INVALID_SOCKET); +#ifdef _WIN32 FD_SET(_fd, &_rFdSet); int rs = ::select(static_cast<int>(_fd + 1), &_rFdSet, 0, 0, 0); +#else + struct pollfd fdSet[1]; + fdSet[0].fd = _fd; + fdSet[0].events = POLLIN; + int rs = ::poll(fdSet, 1, -1); +#endif if(rs == SOCKET_ERROR) { @@ -368,8 +381,10 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s throw; } +#ifdef _WIN32 FD_ZERO(&_rFdSet); FD_ZERO(&_wFdSet); +#endif } IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const string& host, int port, bool connect) : @@ -406,8 +421,10 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s throw; } +#ifdef _WIN32 FD_ZERO(&_rFdSet); FD_ZERO(&_wFdSet); +#endif } IceInternal::UdpTransceiver::~UdpTransceiver() diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 07f5da32a03..f836b72f53c 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -62,8 +62,10 @@ private: SOCKET _fd; struct sockaddr_in _addr; +#ifdef _WIN32 fd_set _rFdSet; fd_set _wFdSet; +#endif bool _connect; int _rcvSize; int _sndSize; diff --git a/cpp/src/IceSSL/Util.cpp b/cpp/src/IceSSL/Util.cpp index 8595a80293e..574d570b03c 100644 --- a/cpp/src/IceSSL/Util.cpp +++ b/cpp/src/IceSSL/Util.cpp @@ -276,6 +276,7 @@ IceSSL::DHParams::get(int keyLength) static bool selectReadWrite(SOCKET fd, bool read, int timeout) { +#ifdef _WIN32 fd_set rFdSet, wFdSet; FD_ZERO(&rFdSet); FD_ZERO(&wFdSet); @@ -287,9 +288,15 @@ selectReadWrite(SOCKET fd, bool read, int timeout) { FD_SET(fd, &wFdSet); } +#else + struct pollfd pollfd[1]; + pollfd[0].fd = fd; + pollfd[0].events = read ? POLLIN : POLLOUT; +#endif repeatSelect: int ret; +#ifdef _WIN32 if(timeout >= 0) { struct timeval tv; @@ -301,6 +308,9 @@ repeatSelect: { ret = ::select(static_cast<int>(fd) + 1, &rFdSet, &wFdSet, 0, 0); } +#else + ret = ::poll(pollfd, 1, timeout); +#endif if(ret == 0) { |