From 45499a8e8fb73c414df663a125a96901a4d00e2e Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Tue, 17 Jul 2018 20:37:29 +0100 Subject: Replace select with epoll Remove the clunky and fiddly select(2) implementation with a much slicker and good god faster epoll(7) based one. --- .gitmodules | 3 - icespider/embedded/Jamfile.jam | 6 +- icespider/embedded/clientSocket.cpp | 62 ++++++++++------- icespider/embedded/clientSocket.h | 7 +- icespider/embedded/concurrentqueue | 1 - icespider/embedded/embedded.cpp | 126 ++++++++++++++++------------------- icespider/embedded/embedded.h | 18 ++--- icespider/embedded/listenSocket.cpp | 13 +++- icespider/embedded/listenSocket.h | 3 +- icespider/embedded/socketEvents.h | 19 ------ icespider/embedded/socketHandler.cpp | 18 +---- icespider/embedded/socketHandler.h | 13 +--- icespider/unittests/testEmbedded.cpp | 1 + 13 files changed, 124 insertions(+), 166 deletions(-) delete mode 160000 icespider/embedded/concurrentqueue delete mode 100644 icespider/embedded/socketEvents.h diff --git a/.gitmodules b/.gitmodules index 4322557..e0e648c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,6 +2,3 @@ path = ice url = https://github.com/zeroc-ice/ice branch = 3.7 -[submodule "icespider/fcgi/concurrentqueue"] - path = icespider/embedded/concurrentqueue - url = https://github.com/cameron314/concurrentqueue/ diff --git a/icespider/embedded/Jamfile.jam b/icespider/embedded/Jamfile.jam index 1c52a5f..a658bd7 100644 --- a/icespider/embedded/Jamfile.jam +++ b/icespider/embedded/Jamfile.jam @@ -1,19 +1,15 @@ lib slicer ; -lib fcgi : : fcgi ; -lib fcgi++ : : fcgi++ ; lib adhocutil : : : : /usr/include/adhocutil/ ; lib icespider-embedded : - [ glob-tree *.cpp : bin concurrentqueue ] + [ glob-tree *.cpp : bin ] : slicer - concurrentqueue ..//core//icespider-core ../core//icespider-core : : ..//core//icespider-core ../core//icespider-core . - concurrentqueue ; diff --git a/icespider/embedded/clientSocket.cpp b/icespider/embedded/clientSocket.cpp index 73aa3cf..54aa7cb 100644 --- a/icespider/embedded/clientSocket.cpp +++ b/icespider/embedded/clientSocket.cpp @@ -1,55 +1,71 @@ #include "clientSocket.h" +#include #include +#include #include #include +#include namespace IceSpider::Embedded { static socklen_t clientlen = sizeof(struct sockaddr_in); ClientSocket::ClientSocket(int pfd) : - SocketHandler(accept(pfd, (struct sockaddr *) &clientaddr, &clientlen)), + SocketHandler(accept4(pfd, (struct sockaddr *) &clientaddr, &clientlen, SOCK_NONBLOCK)), buf(BUFSIZ), rec(0), state(State::reading_headers) { } - FdSocketEventResultFuture ClientSocket::read(Listener * listener) + int ClientSocket::read(Listener *) { - Work w([this]() { - if (buf.size() == rec) { - buf.resize(rec * 2); - } - auto r = ::read(fd, &buf.at(rec), buf.size() - rec); - if (r < 1) { - return FDSetChange::Remove; - } switch (state) { case State::reading_headers: - read_headers(r); + return read_headers(); break; case State::streaming_input: - stream_input(r); + return stream_input(); break; } - return FDSetChange::NoChange; - }); - return returnQueued(listener, fd, std::move(w)); + return -1; } - void ClientSocket::read_headers(int r) + int ClientSocket::read_headers() { - buf[rec + r] = 0; - if (strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n")) { - auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27); - (void)w; - rec = 0; + auto readBytes = [this]() { + if (buf.size() == rec) { + buf.resize(rec * 2); + } + return ::read(fd, &buf.at(rec), buf.size() - rec); + }; + int r; + while ((r = readBytes()) > 0) { + buf[rec + r] = 0; + auto end = strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n"); + if (end) { + auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27); + (void)w; + rec = 0; + buf.erase(buf.begin(), buf.begin() + (end - &buf.front())); + } + else { + rec += r; + } + } + if (r < 1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + return EPOLLET | EPOLLRDHUP | EPOLLONESHOT | EPOLLIN | EPOLLERR | EPOLLHUP; } - rec += r; + return -1; + } + + int ClientSocket::stream_input() + { + return 0; } - void ClientSocket::stream_input(int) + int ClientSocket::write(Listener *) { + return 0; } } diff --git a/icespider/embedded/clientSocket.h b/icespider/embedded/clientSocket.h index 0d6d526..72449a0 100644 --- a/icespider/embedded/clientSocket.h +++ b/icespider/embedded/clientSocket.h @@ -10,11 +10,12 @@ namespace IceSpider::Embedded { public: ClientSocket(int fd); - FdSocketEventResultFuture read(Listener * listener) override; + int read(Listener * listener) override; + int write(Listener * listener) override; private: - inline void read_headers(int bytes); - inline void stream_input(int bytes); + inline int read_headers(); + inline int stream_input(); enum class State { reading_headers, diff --git a/icespider/embedded/concurrentqueue b/icespider/embedded/concurrentqueue deleted file mode 160000 index 8f7e861..0000000 --- a/icespider/embedded/concurrentqueue +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8f7e861dd9411a0bf77a6b9de83a47b3424fafba diff --git a/icespider/embedded/embedded.cpp b/icespider/embedded/embedded.cpp index 2b1cfcd..5246842 100644 --- a/icespider/embedded/embedded.cpp +++ b/icespider/embedded/embedded.cpp @@ -1,95 +1,77 @@ #include "embedded.h" #include "listenSocket.h" +#include +#include +#include +#include +#include namespace IceSpider::Embedded { Listener::Listener() : - work(1024), - topSock(0) + sockCount(0), + epollfd(epoll_create1(0)) { - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); } Listener::~Listener() { + close(epollfd); } int Listener::listen(unsigned short portno) { int fd = create(portno); - add(fd); + add(fd, EPOLLIN | EPOLLET | EPOLLONESHOT); return fd; } void Listener::unlisten(int fd) { if (sockets[fd] && dynamic_cast(sockets[fd].get())) { + sockCount--; remove(fd); - } - } - - void Listener::worker() - { - while (topSock > 0) { - try { - SocketHandler::Work w; - if (work.wait_dequeue_timed(w, 500000)) { - w(); - }; - } - catch (std::exception & e) { - } + sockets[fd].reset(); } } void Listener::run() { std::vector workers; - std::vector pending; for (auto x = std::thread::hardware_concurrency(); x; x--) { - workers.emplace_back(&Listener::worker, this); - } - - while (topSock > 0) { - auto r = rfds, w = wfds, e = efds; - struct timeval to = { 0, pending.empty() ? 500000 : 50 }; - if (auto s = select(topSock, &r, &w, &e, &to) > 0) { - pending.reserve(pending.size() + s); - - for (int fd = 0; fd < topSock; fd++) { - if (FD_ISSET(fd, &r)) { - pending.emplace_back(sockets[fd]->read(this)); - } - else if (FD_ISSET(fd, &e)) { - pending.emplace_back(sockets[fd]->except(this)); + workers.emplace_back([this]() { + while (sockCount) { + std::array events; + if (auto s = epoll_wait(epollfd, &events.front(), events.size(), 500); s > 0) { + for (int n = 0; n < s; n++) { + auto & sh = sockets[events[n].data.fd]; + int act = 0; + if (events[n].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { + act = sh->except(this); + } + else { + if (events[n].events & EPOLLIN) { + act = sh->read(this); + } + if (events[n].events & EPOLLOUT) { + act = sh->write(this); + } + } + switch (act) { + case -1: + sh.reset(); + sockCount--; + break; + case 0: + break; + default: + rearm(events[n].data.fd, act); + break; + } + } } } - } - - for (auto p = pending.begin(); p != pending.end(); ) { - auto & [ fd, ser ] = *p; - if (ser.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - auto [ op ] = ser.get(); - switch (op) { - case FDSetChange::NoChange: - FD_SET(fd, &rfds); - break; - case FDSetChange::AddNew: - add(fd); - break; - case FDSetChange::Remove: - remove(fd); - break; - } - p = pending.erase(p); - } - else { - FD_CLR(fd, &rfds); - p++; - } - } + }); } for (auto & t : workers) { @@ -97,21 +79,25 @@ namespace IceSpider::Embedded { } } - void Listener::add(int fd) + void Listener::add(int fd, int flags) { - FD_SET(fd, &rfds); - FD_SET(fd, &efds); + epoll_event ev; + ev.data.fd = fd; + ev.events = flags; + BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0); + } + + void Listener::rearm(int fd, int flags) + { + epoll_event ev; + ev.data.fd = fd; + ev.events = flags; + BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == 0); } void Listener::remove(int fd) { - FD_CLR(fd, &rfds); - FD_CLR(fd, &wfds); - FD_CLR(fd, &efds); - sockets[fd].reset(); - while (topSock > 0 && !sockets[topSock - 1]) { - --topSock; - } + BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL) == 0); } } diff --git a/icespider/embedded/embedded.h b/icespider/embedded/embedded.h index 7d7fafa..0e1055a 100644 --- a/icespider/embedded/embedded.h +++ b/icespider/embedded/embedded.h @@ -5,15 +5,11 @@ #include #include #include -#include -#include #include "socketHandler.h" namespace IceSpider::Embedded { class DLL_PUBLIC Listener { public: - typedef moodycamel::BlockingConcurrentQueue WorkQueue; - Listener(); Listener(unsigned short portno); ~Listener(); @@ -26,23 +22,19 @@ namespace IceSpider::Embedded { template inline int create(const P & ... p) { auto s = std::make_unique(p...); - topSock = std::max(s->fd + 1, topSock); + sockCount++; return (sockets[s->fd] = std::move(s))->fd; } - WorkQueue work; + void add(int fd, int flags); + void rearm(int fd, int flags); + void remove(int fd); private: - inline void add(int fd); - inline void remove(int fd); - - void worker(); - typedef std::unique_ptr SocketPtr; typedef std::array Sockets; - int topSock; + int sockCount, epollfd; Sockets sockets; - fd_set rfds, wfds, efds; }; }; diff --git a/icespider/embedded/listenSocket.cpp b/icespider/embedded/listenSocket.cpp index 945ba43..0f0ad3e 100644 --- a/icespider/embedded/listenSocket.cpp +++ b/icespider/embedded/listenSocket.cpp @@ -2,6 +2,7 @@ #include "embedded.h" #include "clientSocket.h" #include +#include namespace IceSpider::Embedded { ListenSocket::ListenSocket(unsigned short portno) : @@ -24,9 +25,17 @@ namespace IceSpider::Embedded { } } - FdSocketEventResultFuture ListenSocket::read(Listener * listener) + int ListenSocket::read(Listener * listener) { - return returnNow(listener->create(fd), FDSetChange::AddNew); + int newfd = listener->create(fd); + listener->add(newfd, EPOLLET | EPOLLRDHUP | EPOLLONESHOT | EPOLLIN | EPOLLERR | EPOLLHUP); + return EPOLLIN | EPOLLET | EPOLLONESHOT; + } + + int ListenSocket::write(Listener *) + { + // Log a warning about this nonsense? + return 0; } } diff --git a/icespider/embedded/listenSocket.h b/icespider/embedded/listenSocket.h index 36a704f..786fac9 100644 --- a/icespider/embedded/listenSocket.h +++ b/icespider/embedded/listenSocket.h @@ -9,7 +9,8 @@ namespace IceSpider::Embedded { public: ListenSocket(unsigned short portno); - FdSocketEventResultFuture read(Listener * listener) override; + int read(Listener * listener) override; + int write(Listener * listener) override; private: struct sockaddr_in serveraddr; diff --git a/icespider/embedded/socketEvents.h b/icespider/embedded/socketEvents.h deleted file mode 100644 index 9f94d04..0000000 --- a/icespider/embedded/socketEvents.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef ICESPIDER_EMBEDDED_SOCKETEVENTS_H -#define ICESPIDER_EMBEDDED_SOCKETEVENTS_H - -#include -#include - -namespace IceSpider::Embedded { - enum class FDSetChange { - NoChange, - AddNew, - Remove, - }; - typedef std::tuple SocketEventResult; - typedef std::future SocketEventResultFuture; - typedef std::tuple FdSocketEventResultFuture; -} - -#endif - diff --git a/icespider/embedded/socketHandler.cpp b/icespider/embedded/socketHandler.cpp index 7d93afd..2e5a7b3 100644 --- a/icespider/embedded/socketHandler.cpp +++ b/icespider/embedded/socketHandler.cpp @@ -17,23 +17,9 @@ namespace IceSpider::Embedded { close(fd); } - FdSocketEventResultFuture SocketHandler::returnNow(int fd, const SocketEventResult && ser) + int SocketHandler::except(Listener *) { - std::promise p; - p.set_value(ser); - return { fd, p.get_future() }; - } - - FdSocketEventResultFuture SocketHandler::returnQueued(Listener * listener, int fd, Work && work) - { - auto f = work.get_future(); - BOOST_VERIFY_MSG(listener->work.try_enqueue(std::move(work)), "try_enqueue"); - return { fd, std::move(f) }; - } - - FdSocketEventResultFuture SocketHandler::except(Listener *) - { - return returnNow(fd, FDSetChange::Remove); + return -1; } } diff --git a/icespider/embedded/socketHandler.h b/icespider/embedded/socketHandler.h index 67f5a36..1c3e2d8 100644 --- a/icespider/embedded/socketHandler.h +++ b/icespider/embedded/socketHandler.h @@ -1,24 +1,17 @@ #ifndef ICESPIDER_EMBEDDED_SOCKETHANDLER_H #define ICESPIDER_EMBEDDED_SOCKETHANDLER_H -#include -#include "socketEvents.h" - namespace IceSpider::Embedded { class Listener; class SocketHandler { public: - typedef std::packaged_task Work; - SocketHandler(int f); ~SocketHandler(); - static FdSocketEventResultFuture returnNow(int, const SocketEventResult &&); - static FdSocketEventResultFuture returnQueued(Listener *, int, Work &&); - - virtual FdSocketEventResultFuture read(Listener *) = 0; - virtual FdSocketEventResultFuture except(Listener *); + virtual int read(Listener *) = 0; + virtual int write(Listener *) = 0; + virtual int except(Listener *); const int fd; }; diff --git a/icespider/unittests/testEmbedded.cpp b/icespider/unittests/testEmbedded.cpp index ed6b381..d104977 100644 --- a/icespider/unittests/testEmbedded.cpp +++ b/icespider/unittests/testEmbedded.cpp @@ -2,6 +2,7 @@ #include #include +#include class EmbeddedIceSpiderInstance : public IceSpider::Embedded::Listener { public: -- cgit v1.2.3