summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2018-07-17 20:37:29 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2018-07-17 20:37:29 +0100
commit45499a8e8fb73c414df663a125a96901a4d00e2e (patch)
tree2e2a946807c943fe0d0dbc045c648f8d070b8a00
parentWIP (diff)
downloadicespider-45499a8e8fb73c414df663a125a96901a4d00e2e.tar.bz2
icespider-45499a8e8fb73c414df663a125a96901a4d00e2e.tar.xz
icespider-45499a8e8fb73c414df663a125a96901a4d00e2e.zip
Replace select with epoll
Remove the clunky and fiddly select(2) implementation with a much slicker and good god faster epoll(7) based one.
-rw-r--r--.gitmodules3
-rw-r--r--icespider/embedded/Jamfile.jam6
-rw-r--r--icespider/embedded/clientSocket.cpp62
-rw-r--r--icespider/embedded/clientSocket.h7
m---------icespider/embedded/concurrentqueue0
-rw-r--r--icespider/embedded/embedded.cpp126
-rw-r--r--icespider/embedded/embedded.h18
-rw-r--r--icespider/embedded/listenSocket.cpp13
-rw-r--r--icespider/embedded/listenSocket.h3
-rw-r--r--icespider/embedded/socketEvents.h19
-rw-r--r--icespider/embedded/socketHandler.cpp18
-rw-r--r--icespider/embedded/socketHandler.h13
-rw-r--r--icespider/unittests/testEmbedded.cpp1
13 files changed, 124 insertions, 165 deletions
diff --git a/.gitmodules b/.gitmodules
index 4322557..e0e648c 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -2,6 +2,3 @@
path = ice
url = https://github.com/zeroc-ice/ice
branch = 3.7
-[submodule "icespider/fcgi/concurrentqueue"]
- path = icespider/embedded/concurrentqueue
- url = https://github.com/cameron314/concurrentqueue/
diff --git a/icespider/embedded/Jamfile.jam b/icespider/embedded/Jamfile.jam
index 1c52a5f..a658bd7 100644
--- a/icespider/embedded/Jamfile.jam
+++ b/icespider/embedded/Jamfile.jam
@@ -1,19 +1,15 @@
lib slicer ;
-lib fcgi : : <name>fcgi ;
-lib fcgi++ : : <name>fcgi++ ;
lib adhocutil : : : : <include>/usr/include/adhocutil/ ;
lib icespider-embedded :
- [ glob-tree *.cpp : bin concurrentqueue ]
+ [ glob-tree *.cpp : bin ]
:
<library>slicer
- <include>concurrentqueue
<library>..//core//icespider-core
<implicit-dependency>../core//icespider-core
: :
<library>..//core//icespider-core
<implicit-dependency>../core//icespider-core
<include>.
- <include>concurrentqueue
;
diff --git a/icespider/embedded/clientSocket.cpp b/icespider/embedded/clientSocket.cpp
index 73aa3cf..54aa7cb 100644
--- a/icespider/embedded/clientSocket.cpp
+++ b/icespider/embedded/clientSocket.cpp
@@ -1,55 +1,71 @@
#include "clientSocket.h"
+#include <errno.h>
#include <unistd.h>
+#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
+#include <sys/epoll.h>
namespace IceSpider::Embedded {
static socklen_t clientlen = sizeof(struct sockaddr_in);
ClientSocket::ClientSocket(int pfd) :
- SocketHandler(accept(pfd, (struct sockaddr *) &clientaddr, &clientlen)),
+ SocketHandler(accept4(pfd, (struct sockaddr *) &clientaddr, &clientlen, SOCK_NONBLOCK)),
buf(BUFSIZ),
rec(0),
state(State::reading_headers)
{
}
- FdSocketEventResultFuture ClientSocket::read(Listener * listener)
+ int ClientSocket::read(Listener *)
{
- Work w([this]() {
- if (buf.size() == rec) {
- buf.resize(rec * 2);
- }
- auto r = ::read(fd, &buf.at(rec), buf.size() - rec);
- if (r < 1) {
- return FDSetChange::Remove;
- }
switch (state) {
case State::reading_headers:
- read_headers(r);
+ return read_headers();
break;
case State::streaming_input:
- stream_input(r);
+ return stream_input();
break;
}
- return FDSetChange::NoChange;
- });
- return returnQueued(listener, fd, std::move(w));
+ return -1;
}
- void ClientSocket::read_headers(int r)
+ int ClientSocket::read_headers()
{
- buf[rec + r] = 0;
- if (strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n")) {
- auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27);
- (void)w;
- rec = 0;
+ auto readBytes = [this]() {
+ if (buf.size() == rec) {
+ buf.resize(rec * 2);
+ }
+ return ::read(fd, &buf.at(rec), buf.size() - rec);
+ };
+ int r;
+ while ((r = readBytes()) > 0) {
+ buf[rec + r] = 0;
+ auto end = strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n");
+ if (end) {
+ auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27);
+ (void)w;
+ rec = 0;
+ buf.erase(buf.begin(), buf.begin() + (end - &buf.front()));
+ }
+ else {
+ rec += r;
+ }
+ }
+ if (r < 1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ return EPOLLET | EPOLLRDHUP | EPOLLONESHOT | EPOLLIN | EPOLLERR | EPOLLHUP;
}
- rec += r;
+ return -1;
+ }
+
+ int ClientSocket::stream_input()
+ {
+ return 0;
}
- void ClientSocket::stream_input(int)
+ int ClientSocket::write(Listener *)
{
+ return 0;
}
}
diff --git a/icespider/embedded/clientSocket.h b/icespider/embedded/clientSocket.h
index 0d6d526..72449a0 100644
--- a/icespider/embedded/clientSocket.h
+++ b/icespider/embedded/clientSocket.h
@@ -10,11 +10,12 @@ namespace IceSpider::Embedded {
public:
ClientSocket(int fd);
- FdSocketEventResultFuture read(Listener * listener) override;
+ int read(Listener * listener) override;
+ int write(Listener * listener) override;
private:
- inline void read_headers(int bytes);
- inline void stream_input(int bytes);
+ inline int read_headers();
+ inline int stream_input();
enum class State {
reading_headers,
diff --git a/icespider/embedded/concurrentqueue b/icespider/embedded/concurrentqueue
deleted file mode 160000
-Subproject 8f7e861dd9411a0bf77a6b9de83a47b3424fafb
diff --git a/icespider/embedded/embedded.cpp b/icespider/embedded/embedded.cpp
index 2b1cfcd..5246842 100644
--- a/icespider/embedded/embedded.cpp
+++ b/icespider/embedded/embedded.cpp
@@ -1,95 +1,77 @@
#include "embedded.h"
#include "listenSocket.h"
+#include <unistd.h>
+#include <string.h>
+#include <sys/epoll.h>
+#include <boost/assert.hpp>
+#include <thread>
namespace IceSpider::Embedded {
Listener::Listener() :
- work(1024),
- topSock(0)
+ sockCount(0),
+ epollfd(epoll_create1(0))
{
- FD_ZERO(&rfds);
- FD_ZERO(&wfds);
- FD_ZERO(&efds);
}
Listener::~Listener()
{
+ close(epollfd);
}
int Listener::listen(unsigned short portno)
{
int fd = create<ListenSocket>(portno);
- add(fd);
+ add(fd, EPOLLIN | EPOLLET | EPOLLONESHOT);
return fd;
}
void Listener::unlisten(int fd)
{
if (sockets[fd] && dynamic_cast<ListenSocket *>(sockets[fd].get())) {
+ sockCount--;
remove(fd);
- }
- }
-
- void Listener::worker()
- {
- while (topSock > 0) {
- try {
- SocketHandler::Work w;
- if (work.wait_dequeue_timed(w, 500000)) {
- w();
- };
- }
- catch (std::exception & e) {
- }
+ sockets[fd].reset();
}
}
void Listener::run()
{
std::vector<std::thread> workers;
- std::vector<FdSocketEventResultFuture> pending;
for (auto x = std::thread::hardware_concurrency(); x; x--) {
- workers.emplace_back(&Listener::worker, this);
- }
-
- while (topSock > 0) {
- auto r = rfds, w = wfds, e = efds;
- struct timeval to = { 0, pending.empty() ? 500000 : 50 };
- if (auto s = select(topSock, &r, &w, &e, &to) > 0) {
- pending.reserve(pending.size() + s);
-
- for (int fd = 0; fd < topSock; fd++) {
- if (FD_ISSET(fd, &r)) {
- pending.emplace_back(sockets[fd]->read(this));
- }
- else if (FD_ISSET(fd, &e)) {
- pending.emplace_back(sockets[fd]->except(this));
+ workers.emplace_back([this]() {
+ while (sockCount) {
+ std::array<epoll_event, 4> events;
+ if (auto s = epoll_wait(epollfd, &events.front(), events.size(), 500); s > 0) {
+ for (int n = 0; n < s; n++) {
+ auto & sh = sockets[events[n].data.fd];
+ int act = 0;
+ if (events[n].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
+ act = sh->except(this);
+ }
+ else {
+ if (events[n].events & EPOLLIN) {
+ act = sh->read(this);
+ }
+ if (events[n].events & EPOLLOUT) {
+ act = sh->write(this);
+ }
+ }
+ switch (act) {
+ case -1:
+ sh.reset();
+ sockCount--;
+ break;
+ case 0:
+ break;
+ default:
+ rearm(events[n].data.fd, act);
+ break;
+ }
+ }
}
}
- }
-
- for (auto p = pending.begin(); p != pending.end(); ) {
- auto & [ fd, ser ] = *p;
- if (ser.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
- auto [ op ] = ser.get();
- switch (op) {
- case FDSetChange::NoChange:
- FD_SET(fd, &rfds);
- break;
- case FDSetChange::AddNew:
- add(fd);
- break;
- case FDSetChange::Remove:
- remove(fd);
- break;
- }
- p = pending.erase(p);
- }
- else {
- FD_CLR(fd, &rfds);
- p++;
- }
- }
+ });
}
for (auto & t : workers) {
@@ -97,21 +79,25 @@ namespace IceSpider::Embedded {
}
}
- void Listener::add(int fd)
+ void Listener::add(int fd, int flags)
{
- FD_SET(fd, &rfds);
- FD_SET(fd, &efds);
+ epoll_event ev;
+ ev.data.fd = fd;
+ ev.events = flags;
+ BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
+ }
+
+ void Listener::rearm(int fd, int flags)
+ {
+ epoll_event ev;
+ ev.data.fd = fd;
+ ev.events = flags;
+ BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == 0);
}
void Listener::remove(int fd)
{
- FD_CLR(fd, &rfds);
- FD_CLR(fd, &wfds);
- FD_CLR(fd, &efds);
- sockets[fd].reset();
- while (topSock > 0 && !sockets[topSock - 1]) {
- --topSock;
- }
+ BOOST_VERIFY(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL) == 0);
}
}
diff --git a/icespider/embedded/embedded.h b/icespider/embedded/embedded.h
index 7d7fafa..0e1055a 100644
--- a/icespider/embedded/embedded.h
+++ b/icespider/embedded/embedded.h
@@ -5,15 +5,11 @@
#include <memory>
#include <array>
#include <vector>
-#include </usr/include/semaphore.h>
-#include <blockingconcurrentqueue.h>
#include "socketHandler.h"
namespace IceSpider::Embedded {
class DLL_PUBLIC Listener {
public:
- typedef moodycamel::BlockingConcurrentQueue<SocketHandler::Work> WorkQueue;
-
Listener();
Listener(unsigned short portno);
~Listener();
@@ -26,23 +22,19 @@ namespace IceSpider::Embedded {
template<typename T, typename ... P> inline int create(const P & ... p)
{
auto s = std::make_unique<T>(p...);
- topSock = std::max(s->fd + 1, topSock);
+ sockCount++;
return (sockets[s->fd] = std::move(s))->fd;
}
- WorkQueue work;
+ void add(int fd, int flags);
+ void rearm(int fd, int flags);
+ void remove(int fd);
private:
- inline void add(int fd);
- inline void remove(int fd);
-
- void worker();
-
typedef std::unique_ptr<SocketHandler> SocketPtr;
typedef std::array<SocketPtr, 1024> Sockets;
- int topSock;
+ int sockCount, epollfd;
Sockets sockets;
- fd_set rfds, wfds, efds;
};
};
diff --git a/icespider/embedded/listenSocket.cpp b/icespider/embedded/listenSocket.cpp
index 945ba43..0f0ad3e 100644
--- a/icespider/embedded/listenSocket.cpp
+++ b/icespider/embedded/listenSocket.cpp
@@ -2,6 +2,7 @@
#include "embedded.h"
#include "clientSocket.h"
#include <string.h>
+#include <sys/epoll.h>
namespace IceSpider::Embedded {
ListenSocket::ListenSocket(unsigned short portno) :
@@ -24,9 +25,17 @@ namespace IceSpider::Embedded {
}
}
- FdSocketEventResultFuture ListenSocket::read(Listener * listener)
+ int ListenSocket::read(Listener * listener)
{
- return returnNow(listener->create<ClientSocket>(fd), FDSetChange::AddNew);
+ int newfd = listener->create<ClientSocket>(fd);
+ listener->add(newfd, EPOLLET | EPOLLRDHUP | EPOLLONESHOT | EPOLLIN | EPOLLERR | EPOLLHUP);
+ return EPOLLIN | EPOLLET | EPOLLONESHOT;
+ }
+
+ int ListenSocket::write(Listener *)
+ {
+ // Log a warning about this nonsense?
+ return 0;
}
}
diff --git a/icespider/embedded/listenSocket.h b/icespider/embedded/listenSocket.h
index 36a704f..786fac9 100644
--- a/icespider/embedded/listenSocket.h
+++ b/icespider/embedded/listenSocket.h
@@ -9,7 +9,8 @@ namespace IceSpider::Embedded {
public:
ListenSocket(unsigned short portno);
- FdSocketEventResultFuture read(Listener * listener) override;
+ int read(Listener * listener) override;
+ int write(Listener * listener) override;
private:
struct sockaddr_in serveraddr;
diff --git a/icespider/embedded/socketEvents.h b/icespider/embedded/socketEvents.h
deleted file mode 100644
index 9f94d04..0000000
--- a/icespider/embedded/socketEvents.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#ifndef ICESPIDER_EMBEDDED_SOCKETEVENTS_H
-#define ICESPIDER_EMBEDDED_SOCKETEVENTS_H
-
-#include <tuple>
-#include <future>
-
-namespace IceSpider::Embedded {
- enum class FDSetChange {
- NoChange,
- AddNew,
- Remove,
- };
- typedef std::tuple<FDSetChange> SocketEventResult;
- typedef std::future<SocketEventResult> SocketEventResultFuture;
- typedef std::tuple<int, SocketEventResultFuture> FdSocketEventResultFuture;
-}
-
-#endif
-
diff --git a/icespider/embedded/socketHandler.cpp b/icespider/embedded/socketHandler.cpp
index 7d93afd..2e5a7b3 100644
--- a/icespider/embedded/socketHandler.cpp
+++ b/icespider/embedded/socketHandler.cpp
@@ -17,23 +17,9 @@ namespace IceSpider::Embedded {
close(fd);
}
- FdSocketEventResultFuture SocketHandler::returnNow(int fd, const SocketEventResult && ser)
+ int SocketHandler::except(Listener *)
{
- std::promise<SocketEventResult> p;
- p.set_value(ser);
- return { fd, p.get_future() };
- }
-
- FdSocketEventResultFuture SocketHandler::returnQueued(Listener * listener, int fd, Work && work)
- {
- auto f = work.get_future();
- BOOST_VERIFY_MSG(listener->work.try_enqueue(std::move(work)), "try_enqueue");
- return { fd, std::move(f) };
- }
-
- FdSocketEventResultFuture SocketHandler::except(Listener *)
- {
- return returnNow(fd, FDSetChange::Remove);
+ return -1;
}
}
diff --git a/icespider/embedded/socketHandler.h b/icespider/embedded/socketHandler.h
index 67f5a36..1c3e2d8 100644
--- a/icespider/embedded/socketHandler.h
+++ b/icespider/embedded/socketHandler.h
@@ -1,24 +1,17 @@
#ifndef ICESPIDER_EMBEDDED_SOCKETHANDLER_H
#define ICESPIDER_EMBEDDED_SOCKETHANDLER_H
-#include <future>
-#include "socketEvents.h"
-
namespace IceSpider::Embedded {
class Listener;
class SocketHandler {
public:
- typedef std::packaged_task<SocketEventResult()> Work;
-
SocketHandler(int f);
~SocketHandler();
- static FdSocketEventResultFuture returnNow(int, const SocketEventResult &&);
- static FdSocketEventResultFuture returnQueued(Listener *, int, Work &&);
-
- virtual FdSocketEventResultFuture read(Listener *) = 0;
- virtual FdSocketEventResultFuture except(Listener *);
+ virtual int read(Listener *) = 0;
+ virtual int write(Listener *) = 0;
+ virtual int except(Listener *);
const int fd;
};
diff --git a/icespider/unittests/testEmbedded.cpp b/icespider/unittests/testEmbedded.cpp
index ed6b381..d104977 100644
--- a/icespider/unittests/testEmbedded.cpp
+++ b/icespider/unittests/testEmbedded.cpp
@@ -2,6 +2,7 @@
#include <boost/test/unit_test.hpp>
#include <embedded.h>
+#include <thread>
class EmbeddedIceSpiderInstance : public IceSpider::Embedded::Listener {
public: