summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-10-31 15:07:59 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-10-31 15:07:59 +0000
commit60e5ad760f21d676fa37ff7026ce521ebbbe3bc9 (patch)
treefc399d968e3a5de6e263248b6f0784ea5743d5a2 /cpp/src/Ice/ThreadPool.cpp
parentFix (diff)
downloadice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.bz2
ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.tar.xz
ice-60e5ad760f21d676fa37ff7026ce521ebbbe3bc9.zip
Use poll() instead of select() on Unix.
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp164
1 files changed, 120 insertions, 44 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index abef235ca7d..f58467cd28d 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -56,11 +56,35 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_fdIntrRead = fds[0];
_fdIntrWrite = fds[1];
setBlock(_fdIntrRead, false);
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+#if defined(_WIN32)
FD_ZERO(&_fdSet);
FD_SET(_fdIntrRead, &_fdSet);
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
+#elif defined(__linux)
+ _epollFd = epoll_create(1);
+ if(_epollFd < 0)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ _events.resize(1);
+ epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = _fdIntrRead;
+ if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+#else
+ _pollFdSet.resize(1);
+ _pollFdSet[0].fd = _fdIntrRead;
+ _pollFdSet[0].events = POLLIN;
+#endif
//
// We use just one thread as the default. This is the fastest
@@ -160,6 +184,18 @@ IceInternal::ThreadPool::~ThreadPool()
Error out(_instance->initializationData().logger);
out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
}
+
+#ifdef __linux
+ try
+ {
+ closeSocket(_epollFd);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
+ }
+#endif
}
void
@@ -343,9 +379,10 @@ IceInternal::ThreadPool::run()
while(true)
{
+ int ret;
+#if defined(_WIN32)
fd_set fdSet;
memcpy(&fdSet, &_fdSet, sizeof(fd_set));
- int ret;
if(_timeout > 0)
{
struct timeval tv;
@@ -357,7 +394,12 @@ IceInternal::ThreadPool::run()
{
ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0);
}
-
+#elif defined(__linux)
+ ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1);
+#else
+ ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1);
+#endif
+
if(ret == SOCKET_ERROR)
{
if(interrupted())
@@ -388,7 +430,23 @@ IceInternal::ThreadPool::run()
}
else
{
- if(FD_ISSET(_fdIntrRead, &fdSet))
+ bool interrupted = false;
+#if defined(_WIN32)
+ interrupted = FD_ISSET(_fdIntrRead, &fdSet);
+#elif defined(__linux)
+ for(int i = 0; i < ret; ++i)
+ {
+ if(_events[i].data.fd == _fdIntrRead)
+ {
+ interrupted = true;
+ break;
+ }
+ }
+#else
+ assert(_pollFdSet[0].fd == _fdIntrRead);
+ interrupted = _pollFdSet[0].revents != 0;
+#endif
+ if(interrupted)
{
//
// There are two possiblities for an interrupt:
@@ -423,7 +481,26 @@ IceInternal::ThreadPool::run()
if(change.second) // Addition if handler is set.
{
_handlerMap.insert(change);
+#if defined(_WIN32)
FD_SET(change.first, &_fdSet);
+#elif defined(__linux)
+ epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = change.first;
+ if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "error while adding filedescriptor to epoll set:\n";
+ out << errorToString(getSocketErrno());
+ continue;
+ }
+ _events.resize(_handlerMap.size() + 1);
+#else
+ struct pollfd pollFd;
+ pollFd.fd = change.first;
+ pollFd.events = POLLIN;
+ _pollFdSet.push_back(pollFd);
+#endif
_maxFd = max(_maxFd, change.first);
_minFd = min(_minFd, change.first);
continue;
@@ -435,7 +512,29 @@ IceInternal::ThreadPool::run()
handler = p->second;
finished = true;
_handlerMap.erase(p);
+#if defined(_WIN32)
FD_CLR(change.first, &_fdSet);
+#elif defined(__linux)
+ epoll_event event;
+ event.events = 0;
+ if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "error while adding filedescriptor to epoll set:\n";
+ out << errorToString(getSocketErrno());
+ continue;
+ }
+ _events.resize(_handlerMap.size() + 1);
+#else
+ for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
+ {
+ if(p->fd == change.first)
+ {
+ _pollFdSet.erase(p);
+ break;
+ }
+ }
+#endif
_maxFd = _fdIntrRead;
_minFd = _fdIntrRead;
if(!_handlerMap.empty())
@@ -450,14 +549,12 @@ IceInternal::ThreadPool::run()
}
else
{
-//
-// Optimization for WIN32 specific version of fd_set. Looping with a
-// FD_ISSET test like for Unix is very inefficient for WIN32.
-//
-#ifdef _WIN32
//
// Round robin for the filedescriptors.
//
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+#if defined(_WIN32)
if(fdSet.fd_count == 0)
{
Error out(_instance->initializationData().logger);
@@ -465,14 +562,23 @@ IceInternal::ThreadPool::run()
<< " but no filedescriptor is readable";
continue;
}
-
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
for(u_short i = 0; i < fdSet.fd_count; ++i)
{
SOCKET fd = fdSet.fd_array[i];
+#elif defined(__linux)
+ for(int i = 0; i < ret; ++i)
+ {
+ SOCKET fd = _events[i].data.fd;
+#else
+ for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
+ {
+ if(p->revents == 0)
+ {
+ continue;
+ }
+ SOCKET fd = p->fd;
+#endif
assert(fd != INVALID_SOCKET);
-
if(fd > _lastFd || _lastFd == INVALID_SOCKET)
{
largerFd = min(largerFd, fd);
@@ -491,37 +597,7 @@ IceInternal::ThreadPool::run()
assert(smallestFd >= _minFd && smallestFd <= _maxFd);
_lastFd = smallestFd;
}
-#else
- //
- // Round robin for the filedescriptors.
- //
- if(_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
- {
- _lastFd = _minFd - 1;
- }
-
- int loops = 0;
- do
- {
- if(++_lastFd > _maxFd)
- {
- ++loops;
- _lastFd = _minFd;
- }
- }
- while(!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
-
- if(loops > 1)
- {
- Error out(_instance->initializationData().logger);
- out << "select() in `" << _prefix << "' returned " << ret
- << " but no filedescriptor is readable";
- continue;
- }
-#endif
-
assert(_lastFd != _fdIntrRead);
-
map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
if(p == _handlerMap.end())
{