diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /cpp/src/Ice/ThreadPool.cpp | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 501 |
1 files changed, 92 insertions, 409 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 9d2cd3bff6a..3f192fe2e7e 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -29,8 +29,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _instance(instance), _destroyed(false), _prefix(prefix), - _lastFd(INVALID_SOCKET), - _timeout(timeout), + _selector(instance, timeout), _size(0), _sizeMax(0), _sizeWarn(0), @@ -41,59 +40,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _promote(true), _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0) { - SOCKET fds[2]; - createPipe(fds); - _fdIntrRead = fds[0]; - _fdIntrWrite = fds[1]; - setBlock(_fdIntrRead, false); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - -#if defined(_WIN32) - _fdsInUse = 1; // _fdIntrRead is always in use. - FD_ZERO(&_fdSet); - FD_SET(_fdIntrRead, &_fdSet); -#elif defined(ICE_USE_EPOLL) - _epollFd = epoll_create(1); - if(_epollFd < 0) - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - _events.resize(1); - struct epoll_event event; - event.events = EPOLLIN; - event.data.fd = _fdIntrRead; - if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0) - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } -#elif defined(__APPLE__) - _kqueueFd = kqueue(); - if(_kqueueFd < 0) - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - _events.resize(1); - struct kevent event; - EV_SET(&event, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0); - if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0) - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } -#else - _pollFdSet.resize(1); - _pollFdSet[0].fd = _fdIntrRead; - _pollFdSet[0].events = POLLIN; -#endif - // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and @@ -172,48 +118,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p IceInternal::ThreadPool::~ThreadPool() { assert(_destroyed); - - try - { - closeSocket(_fdIntrWrite); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; - } - - try - { - closeSocket(_fdIntrRead); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; - } - -#if defined(ICE_USE_EPOLL) - try - { - closeSocket(_epollFd); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; - } -#elif defined(__APPLE__) - try - { - closeSocket(_kqueueFd); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex; - } -#endif } void @@ -223,8 +127,9 @@ IceInternal::ThreadPool::destroy() assert(!_destroyed); assert(_handlerMap.empty()); assert(_changes.empty()); + assert(_workItems.empty()); _destroyed = true; - setInterrupt(); + _selector.setInterrupt(); } void @@ -234,20 +139,7 @@ IceInternal::ThreadPool::incFdsInUse() // that doesn't have a specific FD limit. #ifdef _WIN32 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); - if(_fdsInUse + 1 > FD_SETSIZE) - { - Warning warn(_instance->initializationData().logger); - warn << "maximum number of connections exceeded"; - - // - // No appropriate errno. - // - SocketException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } - ++_fdsInUse; + _selector.incFdsInUse(); #endif } @@ -258,14 +150,7 @@ IceInternal::ThreadPool::decFdsInUse() // that doesn't have a specific FD limit. #ifdef _WIN32 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); - if(_fdsInUse <= 1) - { - Trace trace(_instance->initializationData().logger, "ThreadPool"); - trace << _prefix << ": about to assert"; - } - assert(_fdsInUse > 1); // _fdIntrRead is always in use. - --_fdsInUse; + _selector.decFdsInUse(); #endif } @@ -275,7 +160,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, handler)); - setInterrupt(); + _selector.setInterrupt(); } void @@ -284,7 +169,16 @@ IceInternal::ThreadPool::unregister(SOCKET fd) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, EventHandlerPtr(0))); - setInterrupt(); + _selector.setInterrupt(); +} + +void +IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + _workItems.push_back(workItem); + _selector.setInterrupt(); } void @@ -352,74 +246,6 @@ IceInternal::ThreadPool::prefix() const return _prefix; } -void -IceInternal::ThreadPool::clearInterrupt() -{ - char c; - -repeat: - -#ifdef _WIN32 - if(::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeat; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } -#else - if(::read(_fdIntrRead, &c, 1) == -1) - { - if(interrupted()) - { - goto repeat; - } - - SyscallException ex(__FILE__, __LINE__); - ex.error = getSystemErrno(); - throw ex; - } -#endif -} - -void -IceInternal::ThreadPool::setInterrupt() -{ - char c = 0; - -repeat: - -#ifdef _WIN32 - if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeat; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } -#else - if(::write(_fdIntrWrite, &c, 1) == -1) - { - if(interrupted()) - { - goto repeat; - } - - SyscallException ex(__FILE__, __LINE__); - ex.error = getSystemErrno(); - throw ex; - } -#endif -} - bool IceInternal::ThreadPool::run() { @@ -440,54 +266,19 @@ IceInternal::ThreadPool::run() while(true) { int ret; -#if defined(_WIN32) - fd_set fdSet; - memcpy(&fdSet, &_fdSet, sizeof(fd_set)); - if(_timeout > 0) - { - struct timeval tv; - tv.tv_sec = _timeout; - tv.tv_usec = 0; - ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, &tv); - } - else - { - ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0); - } -#elif defined(ICE_USE_EPOLL) - ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1); -#elif defined(__APPLE__) - if(_timeout > 0) + try { - struct timespec ts; - ts.tv_sec = _timeout; - ts.tv_nsec = 0; - ret = kevent(_kqueueFd, 0, 0, &_events[0], _events.size(), &ts); + ret = _selector.select(); } - else + catch(const Ice::LocalException& ex) { - ret = kevent(_kqueueFd, 0, 0, &_events[0], _events.size(), 0); - } -#else - ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1); -#endif - - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - //throw ex; Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << ex; continue; } - + EventHandlerPtr handler; + ThreadPoolWorkItemPtr workItem; bool finished = false; bool shutdown = false; @@ -496,38 +287,11 @@ IceInternal::ThreadPool::run() if(ret == 0) // We initiate a shutdown if there is a thread pool timeout. { - assert(_timeout > 0); - _timeout = 0; shutdown = true; } else { - bool interrupted = false; -#if defined(_WIN32) - interrupted = FD_ISSET(_fdIntrRead, &fdSet); -#elif defined(ICE_USE_EPOLL) - for(int i = 0; i < ret; ++i) - { - if(_events[i].data.fd == _fdIntrRead) - { - interrupted = true; - break; - } - } -#elif defined(__APPLE__) - for(int i = 0; i < ret; ++i) - { - if(_events[i].ident == static_cast<unsigned int>(_fdIntrRead)) - { - interrupted = true; - break; - } - } -#else - assert(_pollFdSet[0].fd == _fdIntrRead); - interrupted = _pollFdSet[0].revents != 0; -#endif - if(interrupted) + if(_selector.isInterrupted()) { // // There are two possiblities for an interrupt: @@ -536,7 +300,9 @@ IceInternal::ThreadPool::run() // // 2. An event handler was registered or unregistered. // - + // 3. A work item has been schedulded. + // + // // Thread pool destroyed? // @@ -549,169 +315,52 @@ IceInternal::ThreadPool::run() return true; } - clearInterrupt(); + _selector.clearInterrupt(); // // An event handler must have been registered or // unregistered. // - assert(!_changes.empty()); - pair<SOCKET, EventHandlerPtr> change = _changes.front(); - _changes.pop_front(); - - if(change.second) // Addition if handler is set. + if(_changes.empty()) { - _handlerMap.insert(change); -#if defined(_WIN32) - FD_SET(change.first, &_fdSet); -#elif defined(ICE_USE_EPOLL) - epoll_event event; - event.events = EPOLLIN; - event.data.fd = change.first; - if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0) - { - Error out(_instance->initializationData().logger); - out << "error while adding filedescriptor to epoll set:\n"; - out << errorToString(getSocketErrno()); - continue; - } - _events.resize(_handlerMap.size() + 1); -#elif defined(__APPLE__) - struct kevent event; - EV_SET(&event, change.first, EVFILT_READ, EV_ADD, 0, 0, 0); - if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0) - { - Error out(_instance->initializationData().logger); - out << "error while adding filedescriptor to kqueue:\n"; - out << errorToString(getSocketErrno()); - continue; - } - _events.resize(_handlerMap.size() + 1); -#else - struct pollfd pollFd; - pollFd.fd = change.first; - pollFd.events = POLLIN; - _pollFdSet.push_back(pollFd); -#endif - _maxFd = max(_maxFd, change.first); - _minFd = min(_minFd, change.first); - continue; + assert(!_workItems.empty()); + workItem = _workItems.front(); + _workItems.pop_front(); } - else // Removal if handler is not set. + else { - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); - assert(p != _handlerMap.end()); - handler = p->second; - finished = true; - _handlerMap.erase(p); -#if defined(_WIN32) - FD_CLR(change.first, &_fdSet); -#elif defined(ICE_USE_EPOLL) - epoll_event event; - event.events = 0; - if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0) + assert(!_changes.empty()); + pair<SOCKET, EventHandlerPtr> change = _changes.front(); + _changes.pop_front(); + + if(change.second) // Addition if handler is set. { - Error out(_instance->initializationData().logger); - out << "error while adding filedescriptor from epoll set:\n"; - out << errorToString(getSocketErrno()); + _handlerMap.insert(change); + _selector.add(change.first, NeedRead); continue; } - _events.resize(_handlerMap.size() + 1); -#elif defined(__APPLE__) - struct kevent event; - EV_SET(&event, change.first, EVFILT_READ, EV_DELETE, 0, 0, 0); - if(kevent(_kqueueFd, &event, 1, 0, 0, 0) < 0) - { - Error out(_instance->initializationData().logger); - out << "error while removing filedescriptor from kqueue:\n"; - out << errorToString(getSocketErrno()); - continue; - } - _events.resize(_handlerMap.size() + 1); -#else - for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) - { - if(p->fd == change.first) - { - _pollFdSet.erase(p); - break; - } - } -#endif - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if(!_handlerMap.empty()) + else // Removal if handler is not set. { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); + assert(p != _handlerMap.end()); + handler = p->second; + finished = true; + _handlerMap.erase(p); + _selector.remove(change.first, NeedRead); + // Don't continue; we have to call + // finished() on the event handler below, outside + // the thread synchronization. } - // Don't continue; we have to call - // finished() on the event handler below, outside - // the thread synchronization. } } else { - // - // Round robin for the filedescriptors. - // - SOCKET largerFd = _maxFd + 1; - SOCKET smallestFd = _maxFd + 1; -#if defined(_WIN32) - if(fdSet.fd_count == 0) - { - Error out(_instance->initializationData().logger); - out << "select() in `" << _prefix << "' returned " << ret - << " but no filedescriptor is readable"; - continue; - } - for(u_short i = 0; i < fdSet.fd_count; ++i) - { - SOCKET fd = fdSet.fd_array[i]; -#elif defined(ICE_USE_EPOLL) - for(int i = 0; i < ret; ++i) - { - SOCKET fd = _events[i].data.fd; -#elif defined(__APPLE__) - for(int i = 0; i < ret; ++i) - { - SOCKET fd = _events[i].ident; -#else - for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p) - { - if(p->revents == 0) - { - continue; - } - SOCKET fd = p->fd; -#endif - assert(fd != INVALID_SOCKET); - if(fd > _lastFd || _lastFd == INVALID_SOCKET) - { - largerFd = min(largerFd, fd); - } - - smallestFd = min(smallestFd, fd); - } -#ifdef never // To match ICE_USE_EPOLL __APPLE - }}} -#endif - if(largerFd <= _maxFd) - { - assert(largerFd >= _minFd); - _lastFd = largerFd; - } - else - { - assert(smallestFd >= _minFd && smallestFd <= _maxFd); - _lastFd = smallestFd; - } - assert(_lastFd != _fdIntrRead); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); + SOCKET fd = _selector.getNextSelected(); + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(fd); if(p == _handlerMap.end()) { Error out(_instance->initializationData().logger); - out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'"; + out << "filedescriptor " << fd << " not registered with `" << _prefix << "'"; continue; } @@ -747,6 +396,29 @@ IceInternal::ThreadPool::run() // promoteFollower(). // } + else if(workItem) + { + try + { + // + // "self" is faster than "this", as the reference + // count is not modified. + // + workItem->execute(self); + } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "' while calling execute():\n" << ex; + } + + // + // No "continue", because we want execute() to be + // called in its own thread from this pool. Note that + // this means that execute() must call + // promoteFollower(). + // + } else { assert(handler); @@ -790,10 +462,14 @@ IceInternal::ThreadPool::run() { try { - read(handler); + if(!read(handler)) + { + continue; // Can't read without blocking. + } } - catch(const TimeoutException&) // Expected. + catch(const TimeoutException&) { + assert(false); // This shouldn't occur as we only perform non-blocking reads. continue; } catch(const DatagramLimitException&) // Expected. @@ -809,8 +485,7 @@ IceInternal::ThreadPool::run() { if(handler->datagram()) { - if(_instance->initializationData().properties-> - getPropertyAsInt("Ice.Warn.Connections") > 0) + if(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0) { Warning out(_instance->initializationData().logger); out << "datagram connection exception:\n" << ex << '\n' << handler->toString(); @@ -939,7 +614,7 @@ IceInternal::ThreadPool::run() } } -void +bool IceInternal::ThreadPool::read(const EventHandlerPtr& handler) { BasicStream& stream = handler->_stream; @@ -952,7 +627,10 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) if(stream.i != stream.b.end()) { - handler->read(stream); + if(!handler->read(stream)) + { + return false; + } assert(stream.i == stream.b.end()); } @@ -1036,10 +714,15 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) } else { - handler->read(stream); + if(!handler->read(stream)) + { + return false; + } assert(stream.i == stream.b.end()); } } + + return true; } IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : |