summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Selector.h
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
commitc6dbd090d9691cc0116a2967b2827b858b184dfe (patch)
tree6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /cpp/src/Ice/Selector.h
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'cpp/src/Ice/Selector.h')
-rw-r--r--cpp/src/Ice/Selector.h675
1 files changed, 616 insertions, 59 deletions
diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h
index 2f9bfee6141..a189728b26e 100644
--- a/cpp/src/Ice/Selector.h
+++ b/cpp/src/Ice/Selector.h
@@ -10,97 +10,660 @@
#ifndef ICE_SELECTOR_H
#define ICE_SELECTOR_H
+#include <IceUtil/StringUtil.h>
+
#include <Ice/Config.h>
+#include <Ice/Network.h>
+#include <Ice/SelectorF.h>
#include <Ice/InstanceF.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/LocalException.h>
+#include <Ice/Instance.h>
-#if defined(_WIN32)
-# include <winsock2.h>
-#else
-# define SOCKET int
-# if defined(ICE_USE_EPOLL)
-# include <sys/epoll.h>
-# elif defined(__APPLE__)
-# include <sys/event.h>
-# else
-# include <sys/poll.h>
-# endif
+#if defined(__linux) && !defined(ICE_NO_EPOLL)
+# define ICE_USE_EPOLL 1
+#elif defined(__APPLE__) && !defined(ICE_NO_KQUEUE)
+# define ICE_USE_KQUEUE 1
+#elif defined(_WIN32)
+# define ICE_USE_SELECT 1
#endif
+#if defined(ICE_USE_EPOLL)
+# include <sys/epoll.h>
+#elif defined(ICE_USE_KQUEUE)
+# include <sys/event.h>
+#elif !defined(ICE_USE_SELECT)
+# include <sys/poll.h>
+#endif
namespace IceInternal
{
-enum SocketStatus
-{
- Finished,
- NeedConnect,
- NeedRead,
- NeedWrite
-};
-
-class Selector
+template<class T> class Selector
{
public:
- Selector(const InstancePtr&, int = 0);
- ~Selector();
+ Selector(const InstancePtr& instance, int timeout = 0) :
+ _instance(instance),
+ _timeout(timeout),
+ _interruptCount(0)
+ {
+ SOCKET fds[2];
+ createPipe(fds);
+ _fdIntrRead = fds[0];
+ _fdIntrWrite = fds[1];
+ _lastFd = _fdIntrRead;
- void add(SOCKET, SocketStatus);
- void remove(SOCKET, SocketStatus);
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+ _count = 0;
+ _events.resize(32);
+#if defined(ICE_USE_EPOLL)
+ _queueFd = epoll_create(1);
+ if(_queueFd < 0)
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ epoll_event event;
+ event.events = EPOLLIN;
+ event.data.ptr = 0;
+ if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+#else
+ _queueFd = kqueue();
+ if(_queueFd < 0)
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ struct kevent event;
+ EV_SET(&event, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0);
+ if(kevent(_queueFd, &event, 1, 0, 0, 0) < 0)
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+#endif
+#elif defined(ICE_USE_SELECT)
+ _fdsInUse = 1;
+ FD_ZERO(&_readFdSet);
+ FD_ZERO(&_writeFdSet);
+ FD_ZERO(&_errorFdSet);
+ FD_SET(_fdIntrRead, &_readFdSet);
+#else
+ struct pollfd pollFd;
+ pollFd.fd = _fdIntrRead;
+ pollFd.events = POLLIN;
+ _pollFdSet.push_back(pollFd);
+#endif
+ }
- int select();
- SOCKET getNextSelected();
-
- bool isInterrupted()
+ ~Selector()
+ {
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+ try
+ {
+ closeSocket(_queueFd);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+#endif
+
+ try
+ {
+ closeSocket(_fdIntrWrite);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+
+ try
+ {
+ closeSocket(_fdIntrRead);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+ }
+
+ void add(T* handler, SocketStatus status, bool noInterrupt = false)
+ {
+ assert(status != Finished);
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+#if defined(ICE_USE_EPOLL)
+ epoll_event event;
+ event.events = status == NeedRead ? EPOLLIN : EPOLLOUT;
+ event.data.ptr = handler;
+ if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, handler->_fd, &event) != 0)
+#else // ICE_USE_KQUEUE
+ struct kevent event;
+ if(status == NeedRead)
+ {
+ EV_SET(&event, handler->_fd, EVFILT_READ, EV_ADD, 0, 0, handler);
+ }
+ else
+ {
+ EV_SET(&event, handler->_fd, EVFILT_WRITE, EV_ADD, 0, 0, handler);
+ }
+ if(kevent(_queueFd, &event, 1, 0, 0, 0) < 0)
+#endif
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while adding filedescriptor to selector:\n";
+ out << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+ ++_count;
+#else // ICE_USE_SELECT or ICE_USE_POLL
+ if(noInterrupt)
+ {
+ processInterrupt();
+ addImpl(handler, status);
+ }
+ else
+ {
+ _changes.push_back(ChangeInfo(handler, status, false));
+ setInterrupt();
+ }
+#endif
+ }
+
+ void update(T* handler, SocketStatus oldStatus, SocketStatus newStatus)
+ {
+ // Note: can only be called from the select() thread (remove/add don't use interrupts)
+ assert(newStatus != Finished);
+#if defined(ICE_USE_EPOLL)
+ epoll_event event;
+ event.events = newStatus == NeedRead ? EPOLLIN : EPOLLOUT;
+ event.data.ptr = handler;
+ if(epoll_ctl(_queueFd, EPOLL_CTL_MOD, handler->_fd, &event) != 0)
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "error while updating filedescriptor from selector:\n";
+ out << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+#else
+ remove(handler, oldStatus, true);
+ add(handler, newStatus, true);
+#endif
+ }
+
+ void remove(T* handler, SocketStatus status, bool noInterrupt = false)
+ {
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+#if defined(ICE_USE_EPOLL)
+ epoll_event event;
+ event.events = 0;
+ int rs = epoll_ctl(_queueFd, EPOLL_CTL_DEL, handler->_fd, &event);
+#else // ICE_USE_KQUEUE
+ struct kevent event;
+ if(status == NeedRead)
+ {
+ EV_SET(&event, handler->_fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
+ else
+ {
+ EV_SET(&event, handler->_fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ int rs = kevent(_queueFd, &event, 1, 0, 0, 0);
+#endif
+ if(rs < 0)
+ {
+ //
+ // It's possible for the socket to already be closed at this point.
+ //
+ //Ice::Error out(_instance->initializationData().logger);
+ //out << "error while removing filedescriptor from epoll set:\n";
+ //out << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
+ }
+ --_count;
+#else
+ if(noInterrupt)
+ {
+ processInterrupt();
+ removeImpl(handler, status);
+ }
+ else
+ {
+ _changes.push_back(ChangeInfo(handler, status, true));
+ setInterrupt();
+ }
+#endif
+ }
+
+ int select()
+ {
+ while(true)
+ {
+ int ret;
+ _nSelectedReturned = 0;
+ _nSelected = 0;
+#if defined(ICE_USE_EPOLL)
+ ret = epoll_wait(_queueFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1);
+#elif defined(ICE_USE_KQUEUE)
+ assert(!_events.empty());
+ if(_timeout > 0)
+ {
+ struct timespec ts;
+ ts.tv_sec = _timeout;
+ ts.tv_nsec = 0;
+ ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts);
+ }
+ else
+ {
+ ret = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0);
+ }
+#elif defined(ICE_USE_SELECT)
+ fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet);
+ fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet);
+ fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet);
+ if(_timeout > 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = _timeout;
+ tv.tv_usec = 0;
+ ret = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows
+ }
+ else
+ {
+ ret = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows
+ }
+#else
+ ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1);
+#endif
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ assert(false);
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+
+ assert(ret >= 0);
+ _nSelected = static_cast<unsigned int>(ret);
+ if(_nSelected == 0)
+ {
+ assert(_timeout > 0);
+ _timeout = 0;
+ }
+ return _nSelected;
+ }
+ }
+
+ T* getNextSelected()
{
assert(_nSelected > 0);
-#if defined(_WIN32)
- return FD_ISSET(_fdIntrRead, &_selectedReadFdSet);
-#elif defined(ICE_USE_EPOLL)
- for(unsigned int i = 0; i < _nSelected; ++i)
+
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+ if(_nSelectedReturned == _nSelected)
{
- if(_events[i].data.fd == _fdIntrRead)
+ if(_count != _events.size())
{
- return true;
+ _events.resize(_count);
}
+ return 0;
}
- return false;
-#elif defined(__APPLE__)
+
+ //
+ // Round robin for the filedescriptors.
+ //
+ T* larger = 0;
+ T* smallest = 0;
for(unsigned int i = 0; i < _nSelected; ++i)
{
- if(_events[i].ident == static_cast<unsigned int>(_fdIntrRead))
+#if defined(ICE_USE_EPOLL)
+ T* handler = reinterpret_cast<T*>(_events[i].data.ptr);
+#else
+ T* handler = reinterpret_cast<T*>(_events[i].udata);
+#endif
+ if(!handler) // _fdIntrRead
+ {
+ assert(_nSelectedReturned > 0 && _interruptCount == 0);
+ continue;
+ }
+
+ if(handler->_fd > _lastFd && (larger == 0 || handler->_fd < larger->_fd))
+ {
+ larger = handler;
+ }
+
+ if(smallest == 0 || handler->_fd < smallest->_fd)
+ {
+ smallest = handler;
+ }
+ }
+
+ ++_nSelectedReturned;
+ if(larger)
+ {
+ _lastFd = larger->_fd;
+ return larger;
+ }
+ else
+ {
+ assert(smallest);
+ _lastFd = smallest->_fd;
+ return smallest;
+ }
+#else
+ if(_nSelectedReturned == _nSelected)
+ {
+ return 0;
+ }
+
+ //
+ // Round robin for the filedescriptors.
+ //
+ SOCKET largerFd = INVALID_SOCKET;
+ SOCKET smallestFd = INVALID_SOCKET;
+#if defined(ICE_USE_SELECT)
+ if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "select() in selector returned " << _nSelected << " but no filedescriptor is ready";
+ return 0;
+ }
+
+ const fd_set* fdSet;
+ if(_nSelectedReturned < _selectedReadFdSet.fd_count)
+ {
+ fdSet = &_selectedReadFdSet;
+ }
+ else if(_nSelectedReturned < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count)
+ {
+ fdSet = &_selectedWriteFdSet;
+ }
+ else
+ {
+ fdSet = &_selectedErrorFdSet;
+ }
+
+ for(u_short i = 0; i < fdSet->fd_count; ++i)
+ {
+ SOCKET fd = fdSet->fd_array[i];
+#else
+ for(std::vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
+ {
+ if(p->revents == 0)
+ {
+ continue;
+ }
+ SOCKET fd = p->fd;
+#endif
+ if(fd == _fdIntrRead)
+ {
+ assert(_nSelectedReturned > 0 && _interruptCount == 0);
+ continue;
+ }
+
+ assert(fd != INVALID_SOCKET);
+ if(fd > _lastFd && (largerFd == INVALID_SOCKET || largerFd > fd))
+ {
+ largerFd = fd;
+ }
+
+ if(smallestFd == INVALID_SOCKET || fd < smallestFd)
+ {
+ smallestFd = fd;
+ }
+ }
+
+ if(largerFd != INVALID_SOCKET)
+ {
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd != INVALID_SOCKET);
+ _lastFd = smallestFd;
+ }
+
+ typename std::map<SOCKET, T*>::const_iterator q = _handlerMap.find(_lastFd);
+ if(q == _handlerMap.end())
+ {
+ Ice::Error out(_instance->initializationData().logger);
+ out << "filedescriptor " << _lastFd << " not registered with selector";
+ return 0;
+ }
+ ++_nSelectedReturned;
+ return q->second;
+#endif
+ }
+
+ bool processInterrupt()
+ {
+#if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE)
+ assert(_changes.size() <= _interruptCount);
+ while(!_changes.empty())
+ {
+ clearInterrupt();
+ ChangeInfo& change = _changes.front();
+ if(change.remove)
+ {
+ removeImpl(change.handler, change.status);
+ }
+ else
+ {
+ addImpl(change.handler, change.status);
+ }
+ _changes.pop_front();
+ }
+#endif
+ return _interruptCount == 0; // No more interrupts to process.
+ }
+
+ bool isInterrupted()
+ {
+ return _interruptCount > 0;
+ }
+
+ void setInterrupt()
+ {
+ if(++_interruptCount == 1)
+ {
+ char c = 0;
+ while(true)
{
- return true;
+#ifdef _WIN32
+ if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+#else
+ if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
+#endif
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ break;
}
}
- return false;
+ }
+
+ bool clearInterrupt()
+ {
+ assert(_interruptCount > 0);
+ if(--_interruptCount == 0)
+ {
+ char c;
+
+ while(true)
+ {
+ ssize_t ret;
+#ifdef _WIN32
+ ret = ::recv(_fdIntrRead, &c, 1, 0);
#else
- assert(_pollFdSet[0].fd == _fdIntrRead);
- return _pollFdSet[0].revents != 0;
+ ret = ::read(_fdIntrRead, &c, 1);
#endif
- }
- void setInterrupt();
- void clearInterrupt();
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
- void updateMinMax(SOCKET minFd, SOCKET maxFd);
- void clearMinMax();
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+ ++_nSelectedReturned;
+ return false;
+ }
+ return true;
+ }
-#if defined(_WIN32)
- void incFdsInUse();
- void decFdsInUse();
+#if defined(ICE_USE_SELECT)
+ void incFdsInUse()
+ {
+ if(_fdsInUse + 1 > FD_SETSIZE)
+ {
+ Ice::Warning warn(_instance->initializationData().logger);
+ warn << "maximum number of connections exceeded";
+
+ //
+ // No appropriate errno.
+ //
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+ ++_fdsInUse;
+ }
+
+ void decFdsInUse()
+ {
+ // This is windows specific since every other platform uses an API
+ // that doesn't have a specific FD limit.
+ if(_fdsInUse <= 1)
+ {
+ Trace trace(_instance->initializationData().logger, "ThreadPool");
+ trace << "selector: about to assert";
+ }
+ --_fdsInUse;
+ }
#endif
private:
+#if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE)
+ void addImpl(T* handler, SocketStatus status)
+ {
+ assert(_handlerMap.find(handler->_fd) == _handlerMap.end());
+ _handlerMap.insert(make_pair(handler->_fd, handler));
+#if defined(ICE_USE_SELECT)
+ switch(status)
+ {
+ case NeedRead:
+ FD_SET(handler->_fd, &_readFdSet);
+ break;
+ case NeedWrite:
+ FD_SET(handler->_fd, &_writeFdSet);
+ break;
+ case NeedConnect:
+ FD_SET(handler->_fd, &_writeFdSet);
+ FD_SET(handler->_fd, &_errorFdSet);
+ break;
+ case Finished:
+ assert(false);
+ }
+#else
+ struct pollfd pollFd;
+ pollFd.fd = fd;
+ pollFd.events = status == NeedRead ? POLLIN : POLLOUT;
+ _pollFdSet.push_back(pollFd);
+#endif
+ }
+
+ void removeImpl(T* handler, SocketStatus status)
+ {
+ typename std::map<SOCKET, T*>::iterator p = _handlerMap.find(handler->_fd);
+ assert(p != _handlerMap.end());
+ _handlerMap.erase(p);
+#if defined(ICE_USE_SELECT)
+ switch(status)
+ {
+ case NeedRead:
+ FD_CLR(handler->_fd, &_readFdSet);
+ break;
+ case NeedWrite:
+ FD_CLR(handler->_fd, &_writeFdSet);
+ break;
+ case NeedConnect:
+ FD_CLR(handler->_fd, &_writeFdSet);
+ FD_CLR(handler->_fd, &_errorFdSet);
+ break;
+ case Finished:
+ assert(false);
+ }
+#else
+ for(std::vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
+ {
+ if(p->fd == handler->_fd)
+ {
+ _pollFdSet.erase(p);
+ break;
+ }
+ }
+#endif
+ }
+#endif
+
InstancePtr _instance;
int _timeout;
- SOCKET _maxFd;
SOCKET _lastFd;
unsigned int _nSelected;
unsigned int _nSelectedReturned;
SOCKET _fdIntrRead;
SOCKET _fdIntrWrite;
-#if defined(_WIN32)
+ unsigned int _interruptCount;
+
+#if defined(ICE_USE_EPOLL) || defined(ICE_USE_KQUEUE)
+#if defined(ICE_USE_EPOLL)
+ std::vector<struct epoll_event> _events;
+#else
+ std::vector<struct kevent> _events;
+#endif
+ int _queueFd;
+ unsigned int _count;
+#else
+
+ struct ChangeInfo
+ {
+ ChangeInfo(T* h, SocketStatus s, bool r) : handler(h), status(s), remove(r)
+ {
+ }
+ T* handler;
+ SocketStatus status;
+ bool remove;
+ };
+ typename std::list<ChangeInfo> _changes; // handler set for addition; null for removal.
+ typename std::map<SOCKET, T*> _handlerMap;
+
+#if defined(ICE_USE_SELECT)
fd_set _readFdSet;
fd_set _writeFdSet;
fd_set _errorFdSet;
@@ -108,7 +671,7 @@ private:
fd_set _selectedWriteFdSet;
fd_set _selectedErrorFdSet;
int _fdsInUse;
-
+
fd_set*
fdSetCopy(fd_set& dest, fd_set& src)
{
@@ -120,16 +683,10 @@ private:
}
return 0;
}
-
-#elif defined(ICE_USE_EPOLL)
- int _epollFd;
- std::vector<struct epoll_event> _events;
-#elif defined(__APPLE__)
- int _kqueueFd;
- std::vector<struct kevent> _events;
#else
std::vector<struct pollfd> _pollFdSet;
#endif
+#endif
};
}