summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp501
1 files changed, 92 insertions, 409 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 9d2cd3bff6a..3f192fe2e7e 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -29,8 +29,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_instance(instance),
_destroyed(false),
_prefix(prefix),
- _lastFd(INVALID_SOCKET),
- _timeout(timeout),
+ _selector(instance, timeout),
_size(0),
_sizeMax(0),
_sizeWarn(0),
@@ -41,59 +40,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_promote(true),
_warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
{
- SOCKET fds[2];
- createPipe(fds);
- _fdIntrRead = fds[0];
- _fdIntrWrite = fds[1];
- setBlock(_fdIntrRead, false);
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
-
-#if defined(_WIN32)
- _fdsInUse = 1; // _fdIntrRead is always in use.
- FD_ZERO(&_fdSet);
- FD_SET(_fdIntrRead, &_fdSet);
-#elif defined(ICE_USE_EPOLL)
- _epollFd = epoll_create(1);
- if(_epollFd < 0)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- _events.resize(1);
- struct epoll_event event;
- event.events = EPOLLIN;
- event.data.fd = _fdIntrRead;
- if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-#elif defined(__APPLE__)
- _kqueueFd = kqueue();
- if(_kqueueFd < 0)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- _events.resize(1);
- struct kevent event;
- EV_SET(&event, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0);
- if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-#else
- _pollFdSet.resize(1);
- _pollFdSet[0].fd = _fdIntrRead;
- _pollFdSet[0].events = POLLIN;
-#endif
-
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
@@ -172,48 +118,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
IceInternal::ThreadPool::~ThreadPool()
{
assert(_destroyed);
-
- try
- {
- closeSocket(_fdIntrWrite);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
- }
-
- try
- {
- closeSocket(_fdIntrRead);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
- }
-
-#if defined(ICE_USE_EPOLL)
- try
- {
- closeSocket(_epollFd);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
- }
-#elif defined(__APPLE__)
- try
- {
- closeSocket(_kqueueFd);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
- }
-#endif
}
void
@@ -223,8 +127,9 @@ IceInternal::ThreadPool::destroy()
assert(!_destroyed);
assert(_handlerMap.empty());
assert(_changes.empty());
+ assert(_workItems.empty());
_destroyed = true;
- setInterrupt();
+ _selector.setInterrupt();
}
void
@@ -234,20 +139,7 @@ IceInternal::ThreadPool::incFdsInUse()
// that doesn't have a specific FD limit.
#ifdef _WIN32
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
- if(_fdsInUse + 1 > FD_SETSIZE)
- {
- Warning warn(_instance->initializationData().logger);
- warn << "maximum number of connections exceeded";
-
- //
- // No appropriate errno.
- //
- SocketException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
- ++_fdsInUse;
+ _selector.incFdsInUse();
#endif
}
@@ -258,14 +150,7 @@ IceInternal::ThreadPool::decFdsInUse()
// that doesn't have a specific FD limit.
#ifdef _WIN32
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
- if(_fdsInUse <= 1)
- {
- Trace trace(_instance->initializationData().logger, "ThreadPool");
- trace << _prefix << ": about to assert";
- }
- assert(_fdsInUse > 1); // _fdIntrRead is always in use.
- --_fdsInUse;
+ _selector.decFdsInUse();
#endif
}
@@ -275,7 +160,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, handler));
- setInterrupt();
+ _selector.setInterrupt();
}
void
@@ -284,7 +169,16 @@ IceInternal::ThreadPool::unregister(SOCKET fd)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
- setInterrupt();
+ _selector.setInterrupt();
+}
+
+void
+IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(!_destroyed);
+ _workItems.push_back(workItem);
+ _selector.setInterrupt();
}
void
@@ -352,74 +246,6 @@ IceInternal::ThreadPool::prefix() const
return _prefix;
}
-void
-IceInternal::ThreadPool::clearInterrupt()
-{
- char c;
-
-repeat:
-
-#ifdef _WIN32
- if(::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeat;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-#else
- if(::read(_fdIntrRead, &c, 1) == -1)
- {
- if(interrupted())
- {
- goto repeat;
- }
-
- SyscallException ex(__FILE__, __LINE__);
- ex.error = getSystemErrno();
- throw ex;
- }
-#endif
-}
-
-void
-IceInternal::ThreadPool::setInterrupt()
-{
- char c = 0;
-
-repeat:
-
-#ifdef _WIN32
- if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeat;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-#else
- if(::write(_fdIntrWrite, &c, 1) == -1)
- {
- if(interrupted())
- {
- goto repeat;
- }
-
- SyscallException ex(__FILE__, __LINE__);
- ex.error = getSystemErrno();
- throw ex;
- }
-#endif
-}
-
bool
IceInternal::ThreadPool::run()
{
@@ -440,54 +266,19 @@ IceInternal::ThreadPool::run()
while(true)
{
int ret;
-#if defined(_WIN32)
- fd_set fdSet;
- memcpy(&fdSet, &_fdSet, sizeof(fd_set));
- if(_timeout > 0)
- {
- struct timeval tv;
- tv.tv_sec = _timeout;
- tv.tv_usec = 0;
- ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, &tv);
- }
- else
- {
- ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0);
- }
-#elif defined(ICE_USE_EPOLL)
- ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1);
-#elif defined(__APPLE__)
- if(_timeout > 0)
+ try
{
- struct timespec ts;
- ts.tv_sec = _timeout;
- ts.tv_nsec = 0;
- ret = kevent(_kqueueFd, 0, 0, &_events[0], _events.size(), &ts);
+ ret = _selector.select();
}
- else
+ catch(const Ice::LocalException& ex)
{
- ret = kevent(_kqueueFd, 0, 0, &_events[0], _events.size(), 0);
- }
-#else
- ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1);
-#endif
-
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- //throw ex;
Error out(_instance->initializationData().logger);
out << "exception in `" << _prefix << "':\n" << ex;
continue;
}
-
+
EventHandlerPtr handler;
+ ThreadPoolWorkItemPtr workItem;
bool finished = false;
bool shutdown = false;
@@ -496,38 +287,11 @@ IceInternal::ThreadPool::run()
if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
{
- assert(_timeout > 0);
- _timeout = 0;
shutdown = true;
}
else
{
- bool interrupted = false;
-#if defined(_WIN32)
- interrupted = FD_ISSET(_fdIntrRead, &fdSet);
-#elif defined(ICE_USE_EPOLL)
- for(int i = 0; i < ret; ++i)
- {
- if(_events[i].data.fd == _fdIntrRead)
- {
- interrupted = true;
- break;
- }
- }
-#elif defined(__APPLE__)
- for(int i = 0; i < ret; ++i)
- {
- if(_events[i].ident == static_cast<unsigned int>(_fdIntrRead))
- {
- interrupted = true;
- break;
- }
- }
-#else
- assert(_pollFdSet[0].fd == _fdIntrRead);
- interrupted = _pollFdSet[0].revents != 0;
-#endif
- if(interrupted)
+ if(_selector.isInterrupted())
{
//
// There are two possiblities for an interrupt:
@@ -536,7 +300,9 @@ IceInternal::ThreadPool::run()
//
// 2. An event handler was registered or unregistered.
//
-
+ // 3. A work item has been schedulded.
+ //
+
//
// Thread pool destroyed?
//
@@ -549,169 +315,52 @@ IceInternal::ThreadPool::run()
return true;
}
- clearInterrupt();
+ _selector.clearInterrupt();
//
// An event handler must have been registered or
// unregistered.
//
- assert(!_changes.empty());
- pair<SOCKET, EventHandlerPtr> change = _changes.front();
- _changes.pop_front();
-
- if(change.second) // Addition if handler is set.
+ if(_changes.empty())
{
- _handlerMap.insert(change);
-#if defined(_WIN32)
- FD_SET(change.first, &_fdSet);
-#elif defined(ICE_USE_EPOLL)
- epoll_event event;
- event.events = EPOLLIN;
- event.data.fd = change.first;
- if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0)
- {
- Error out(_instance->initializationData().logger);
- out << "error while adding filedescriptor to epoll set:\n";
- out << errorToString(getSocketErrno());
- continue;
- }
- _events.resize(_handlerMap.size() + 1);
-#elif defined(__APPLE__)
- struct kevent event;
- EV_SET(&event, change.first, EVFILT_READ, EV_ADD, 0, 0, 0);
- if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0)
- {
- Error out(_instance->initializationData().logger);
- out << "error while adding filedescriptor to kqueue:\n";
- out << errorToString(getSocketErrno());
- continue;
- }
- _events.resize(_handlerMap.size() + 1);
-#else
- struct pollfd pollFd;
- pollFd.fd = change.first;
- pollFd.events = POLLIN;
- _pollFdSet.push_back(pollFd);
-#endif
- _maxFd = max(_maxFd, change.first);
- _minFd = min(_minFd, change.first);
- continue;
+ assert(!_workItems.empty());
+ workItem = _workItems.front();
+ _workItems.pop_front();
}
- else // Removal if handler is not set.
+ else
{
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
- assert(p != _handlerMap.end());
- handler = p->second;
- finished = true;
- _handlerMap.erase(p);
-#if defined(_WIN32)
- FD_CLR(change.first, &_fdSet);
-#elif defined(ICE_USE_EPOLL)
- epoll_event event;
- event.events = 0;
- if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0)
+ assert(!_changes.empty());
+ pair<SOCKET, EventHandlerPtr> change = _changes.front();
+ _changes.pop_front();
+
+ if(change.second) // Addition if handler is set.
{
- Error out(_instance->initializationData().logger);
- out << "error while adding filedescriptor from epoll set:\n";
- out << errorToString(getSocketErrno());
+ _handlerMap.insert(change);
+ _selector.add(change.first, NeedRead);
continue;
}
- _events.resize(_handlerMap.size() + 1);
-#elif defined(__APPLE__)
- struct kevent event;
- EV_SET(&event, change.first, EVFILT_READ, EV_DELETE, 0, 0, 0);
- if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0)
- {
- Error out(_instance->initializationData().logger);
- out << "error while removing filedescriptor from kqueue:\n";
- out << errorToString(getSocketErrno());
- continue;
- }
- _events.resize(_handlerMap.size() + 1);
-#else
- for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
- {
- if(p->fd == change.first)
- {
- _pollFdSet.erase(p);
- break;
- }
- }
-#endif
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if(!_handlerMap.empty())
+ else // Removal if handler is not set.
{
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
+ assert(p != _handlerMap.end());
+ handler = p->second;
+ finished = true;
+ _handlerMap.erase(p);
+ _selector.remove(change.first, NeedRead);
+ // Don't continue; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
}
- // Don't continue; we have to call
- // finished() on the event handler below, outside
- // the thread synchronization.
}
}
else
{
- //
- // Round robin for the filedescriptors.
- //
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
-#if defined(_WIN32)
- if(fdSet.fd_count == 0)
- {
- Error out(_instance->initializationData().logger);
- out << "select() in `" << _prefix << "' returned " << ret
- << " but no filedescriptor is readable";
- continue;
- }
- for(u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
-#elif defined(ICE_USE_EPOLL)
- for(int i = 0; i < ret; ++i)
- {
- SOCKET fd = _events[i].data.fd;
-#elif defined(__APPLE__)
- for(int i = 0; i < ret; ++i)
- {
- SOCKET fd = _events[i].ident;
-#else
- for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
- {
- if(p->revents == 0)
- {
- continue;
- }
- SOCKET fd = p->fd;
-#endif
- assert(fd != INVALID_SOCKET);
- if(fd > _lastFd || _lastFd == INVALID_SOCKET)
- {
- largerFd = min(largerFd, fd);
- }
-
- smallestFd = min(smallestFd, fd);
- }
-#ifdef never // To match ICE_USE_EPOLL __APPLE
- }}}
-#endif
- if(largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
- assert(_lastFd != _fdIntrRead);
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ SOCKET fd = _selector.getNextSelected();
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(fd);
if(p == _handlerMap.end())
{
Error out(_instance->initializationData().logger);
- out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
+ out << "filedescriptor " << fd << " not registered with `" << _prefix << "'";
continue;
}
@@ -747,6 +396,29 @@ IceInternal::ThreadPool::run()
// promoteFollower().
//
}
+ else if(workItem)
+ {
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ workItem->execute(self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling execute():\n" << ex;
+ }
+
+ //
+ // No "continue", because we want execute() to be
+ // called in its own thread from this pool. Note that
+ // this means that execute() must call
+ // promoteFollower().
+ //
+ }
else
{
assert(handler);
@@ -790,10 +462,14 @@ IceInternal::ThreadPool::run()
{
try
{
- read(handler);
+ if(!read(handler))
+ {
+ continue; // Can't read without blocking.
+ }
}
- catch(const TimeoutException&) // Expected.
+ catch(const TimeoutException&)
{
+ assert(false); // This shouldn't occur as we only perform non-blocking reads.
continue;
}
catch(const DatagramLimitException&) // Expected.
@@ -809,8 +485,7 @@ IceInternal::ThreadPool::run()
{
if(handler->datagram())
{
- if(_instance->initializationData().properties->
- getPropertyAsInt("Ice.Warn.Connections") > 0)
+ if(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0)
{
Warning out(_instance->initializationData().logger);
out << "datagram connection exception:\n" << ex << '\n' << handler->toString();
@@ -939,7 +614,7 @@ IceInternal::ThreadPool::run()
}
}
-void
+bool
IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
{
BasicStream& stream = handler->_stream;
@@ -952,7 +627,10 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
if(stream.i != stream.b.end())
{
- handler->read(stream);
+ if(!handler->read(stream))
+ {
+ return false;
+ }
assert(stream.i == stream.b.end());
}
@@ -1036,10 +714,15 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
}
else
{
- handler->read(stream);
+ if(!handler->read(stream))
+ {
+ return false;
+ }
assert(stream.i == stream.b.end());
}
}
+
+ return true;
}
IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) :