summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2018-05-29 19:53:30 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2018-06-04 19:00:28 +0100
commit140a6858cf8e07b4f65720ba1c24142cc287b700 (patch)
tree08425d1673a04e546005c6fc20263a18ed5d9039
parentRemove parameter setters from ihttpRequest (diff)
downloadicespider-140a6858cf8e07b4f65720ba1c24142cc287b700.tar.bz2
icespider-140a6858cf8e07b4f65720ba1c24142cc287b700.tar.xz
icespider-140a6858cf8e07b4f65720ba1c24142cc287b700.zip
Initial commit of embedded server
First cut of a light-weight multi-threaded HTTP daemon library. Let's be honest... it does nothing yet, but it's our starting point.
-rw-r--r--.gitmodules3
-rw-r--r--icespider/embedded/Jamfile.jam19
m---------icespider/embedded/concurrentqueue0
-rw-r--r--icespider/embedded/embedded.cpp240
-rw-r--r--icespider/embedded/embedded.h105
-rw-r--r--icespider/unittests/Jamfile.jam17
-rw-r--r--icespider/unittests/testEmbedded.cpp62
7 files changed, 446 insertions, 0 deletions
diff --git a/.gitmodules b/.gitmodules
index e0e648c..4322557 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -2,3 +2,6 @@
path = ice
url = https://github.com/zeroc-ice/ice
branch = 3.7
+[submodule "icespider/fcgi/concurrentqueue"]
+ path = icespider/embedded/concurrentqueue
+ url = https://github.com/cameron314/concurrentqueue/
diff --git a/icespider/embedded/Jamfile.jam b/icespider/embedded/Jamfile.jam
new file mode 100644
index 0000000..1c52a5f
--- /dev/null
+++ b/icespider/embedded/Jamfile.jam
@@ -0,0 +1,19 @@
+lib slicer ;
+lib fcgi : : <name>fcgi ;
+lib fcgi++ : : <name>fcgi++ ;
+lib adhocutil : : : : <include>/usr/include/adhocutil/ ;
+
+lib icespider-embedded :
+ [ glob-tree *.cpp : bin concurrentqueue ]
+ :
+ <library>slicer
+ <include>concurrentqueue
+ <library>..//core//icespider-core
+ <implicit-dependency>../core//icespider-core
+ : :
+ <library>..//core//icespider-core
+ <implicit-dependency>../core//icespider-core
+ <include>.
+ <include>concurrentqueue
+ ;
+
diff --git a/icespider/embedded/concurrentqueue b/icespider/embedded/concurrentqueue
new file mode 160000
+Subproject 8f7e861dd9411a0bf77a6b9de83a47b3424fafb
diff --git a/icespider/embedded/embedded.cpp b/icespider/embedded/embedded.cpp
new file mode 100644
index 0000000..9b65839
--- /dev/null
+++ b/icespider/embedded/embedded.cpp
@@ -0,0 +1,240 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include "embedded.h"
+#include <lockHelpers.h>
+#include <functional>
+#include <future>
+#include <boost/assert.hpp>
+
+static socklen_t clientlen = sizeof(struct sockaddr_in);
+
+namespace IceSpider::Embedded {
+ SocketHandler::SocketHandler(int f) :
+ fd(f)
+ {
+ if (fd < 0) {
+ throw std::runtime_error("Invalid socket");
+ }
+ }
+
+ SocketHandler::~SocketHandler()
+ {
+ close(fd);
+ }
+
+ FdSocketEventResultFuture SocketHandler::returnNow(int fd, const SocketEventResult && ser)
+ {
+ std::promise<SocketEventResult> p;
+ p.set_value(ser);
+ return { fd, p.get_future() };
+ }
+
+ FdSocketEventResultFuture SocketHandler::returnQueued(Listener * listener, int fd, Work && work)
+ {
+ auto f = work.get_future();
+ BOOST_VERIFY_MSG(listener->work.try_enqueue(std::move(work)), "try_enqueue");
+ return { fd, std::move(f) };
+ }
+
+ FdSocketEventResultFuture SocketHandler::except(Listener *)
+ {
+ return returnNow(fd, FDSetChange::Remove);
+ }
+
+ ClientSocket::ClientSocket(int pfd) :
+ SocketHandler(accept(pfd, (struct sockaddr *) &clientaddr, &clientlen)),
+ buf(BUFSIZ),
+ rec(0),
+ state(State::reading_headers)
+ {
+ }
+
+ FdSocketEventResultFuture ClientSocket::read(Listener * listener)
+ {
+ Work w([this]() {
+ if (buf.size() == rec) {
+ buf.resize(rec * 2);
+ }
+ auto r = ::read(fd, &buf.at(rec), buf.size() - rec);
+ if (r < 1) {
+ return FDSetChange::Remove;
+ }
+ switch (state) {
+ case State::reading_headers:
+ read_headers(r);
+ break;
+ case State::streaming_input:
+ stream_input(r);
+ break;
+ }
+ return FDSetChange::NoChange;
+ });
+ return returnQueued(listener, fd, std::move(w));
+ }
+
+ void ClientSocket::read_headers(int r)
+ {
+ buf[rec + r] = 0;
+ if (strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n")) {
+ auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27);
+ (void)w;
+ rec = 0;
+ }
+ rec += r;
+ }
+
+ void ClientSocket::stream_input(int)
+ {
+ }
+
+ ListenSocket::ListenSocket(unsigned short portno) :
+ SocketHandler(socket(AF_INET, SOCK_STREAM, 0))
+ {
+ int optval = 1;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int));
+
+ bzero(&serveraddr, sizeof(serveraddr));
+ serveraddr.sin_family = AF_INET;
+ serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ serveraddr.sin_port = htons(portno);
+
+ if (bind(fd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
+ throw std::runtime_error("ERROR on binding");
+ }
+
+ if (listen(fd, 5) < 0) {
+ throw std::runtime_error("ERROR on listen");
+ }
+ }
+
+ FdSocketEventResultFuture ListenSocket::read(Listener * listener)
+ {
+ return returnNow(listener->create<ClientSocket>(fd), FDSetChange::AddNew);
+ }
+
+ Listener::Listener() :
+ work(1024),
+ topSock(0)
+ {
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&efds);
+ }
+
+ Listener::~Listener()
+ {
+ }
+
+ int Listener::listen(unsigned short portno)
+ {
+ int fd = create<ListenSocket>(portno);
+ add(fd);
+ return fd;
+ }
+
+ void Listener::unlisten(int fd)
+ {
+ if (sockets[fd] && dynamic_cast<ListenSocket *>(sockets[fd].get())) {
+ remove(fd);
+ }
+ }
+
+ void Listener::worker()
+ {
+ while (topSock > 0) {
+ try {
+ SocketHandler::Work w;
+ if (work.wait_dequeue_timed(w, 500000)) {
+ w();
+ };
+ }
+ catch (std::exception & e) {
+ }
+ }
+ }
+
+ void Listener::run()
+ {
+ std::vector<std::thread> workers;
+ std::vector<FdSocketEventResultFuture> pending;
+
+ for (auto x = std::thread::hardware_concurrency(); x; x--) {
+ workers.emplace_back(&Listener::worker, this);
+ }
+
+ while (topSock > 0) {
+ auto r = rfds, w = wfds, e = efds;
+ struct timeval to = { 0, pending.empty() ? 500000 : 50 };
+ if (auto s = select(topSock, &r, &w, &e, &to) > 0) {
+ pending.reserve(pending.size() + s);
+
+ for (int fd = 0; fd < topSock; fd++) {
+ if (FD_ISSET(fd, &r)) {
+ pending.emplace_back(sockets[fd]->read(this));
+ }
+ else if (FD_ISSET(fd, &e)) {
+ pending.emplace_back(sockets[fd]->except(this));
+ }
+ }
+ }
+
+ for (auto p = pending.begin(); p != pending.end(); ) {
+ auto & [ fd, ser ] = *p;
+ if (ser.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
+ auto [ op ] = ser.get();
+ switch (op) {
+ case FDSetChange::NoChange:
+ FD_SET(fd, &rfds);
+ break;
+ case FDSetChange::AddNew:
+ add(fd);
+ break;
+ case FDSetChange::Remove:
+ remove(fd);
+ break;
+ }
+ p = pending.erase(p);
+ }
+ else {
+ FD_CLR(fd, &rfds);
+ p++;
+ }
+ }
+ }
+
+ for (auto & t : workers) {
+ t.join();
+ }
+ }
+
+ template<typename T, typename ... P> inline int Listener::create(const P & ... p)
+ {
+ auto s = std::make_unique<T>(p...);
+ topSock = std::max(s->fd + 1, topSock);
+ return (sockets[s->fd] = std::move(s))->fd;
+ }
+
+ void Listener::add(int fd)
+ {
+ FD_SET(fd, &rfds);
+ FD_SET(fd, &efds);
+ }
+
+ void Listener::remove(int fd)
+ {
+ FD_CLR(fd, &rfds);
+ FD_CLR(fd, &wfds);
+ FD_CLR(fd, &efds);
+ sockets[fd].reset();
+ while (topSock > 0 && !sockets[topSock - 1]) {
+ --topSock;
+ }
+ }
+}
+
diff --git a/icespider/embedded/embedded.h b/icespider/embedded/embedded.h
new file mode 100644
index 0000000..08f280a
--- /dev/null
+++ b/icespider/embedded/embedded.h
@@ -0,0 +1,105 @@
+#ifndef ICESPIDER_EMBEDDED_H
+#define ICESPIDER_EMBEDDED_H
+
+#include <sys/select.h>
+#include <netinet/in.h>
+#include <visibility.h>
+#include <memory>
+#include <array>
+#include <vector>
+#include <future>
+#include </usr/include/semaphore.h>
+#include <blockingconcurrentqueue.h>
+
+namespace IceSpider::Embedded {
+ class Listener;
+
+ enum class FDSetChange {
+ NoChange,
+ AddNew,
+ Remove,
+ };
+ typedef std::tuple<FDSetChange> SocketEventResult;
+ typedef std::future<SocketEventResult> SocketEventResultFuture;
+ typedef std::tuple<int, SocketEventResultFuture> FdSocketEventResultFuture;
+
+ class SocketHandler {
+ public:
+ typedef std::packaged_task<SocketEventResult()> Work;
+
+ SocketHandler(int f);
+ ~SocketHandler();
+
+ static inline FdSocketEventResultFuture returnNow(int, const SocketEventResult &&);
+ static inline FdSocketEventResultFuture returnQueued(Listener *, int, Work &&);
+
+ virtual FdSocketEventResultFuture read(Listener *) = 0;
+ virtual FdSocketEventResultFuture except(Listener *);
+
+ const int fd;
+ };
+
+ class ClientSocket : public SocketHandler {
+ public:
+ ClientSocket(int fd);
+
+ FdSocketEventResultFuture read(Listener * listener) override;
+
+ private:
+ inline void read_headers(int bytes);
+ inline void stream_input(int bytes);
+
+ enum class State {
+ reading_headers,
+ streaming_input,
+ };
+
+ struct sockaddr_in clientaddr;
+ std::vector<char> buf;
+ std::size_t rec;
+ State state;
+ };
+
+ class ListenSocket : public SocketHandler {
+ public:
+ ListenSocket(unsigned short portno);
+
+ FdSocketEventResultFuture read(Listener * listener) override;
+
+ private:
+ struct sockaddr_in serveraddr;
+ };
+
+ class DLL_PUBLIC Listener {
+ public:
+ typedef moodycamel::BlockingConcurrentQueue<SocketHandler::Work> WorkQueue;
+
+ Listener();
+ Listener(unsigned short portno);
+ ~Listener();
+
+ int listen(unsigned short portno);
+ void unlisten(int fd);
+
+ void run();
+
+ template<typename T, typename ... P> inline int create(const P & ... p);
+
+ WorkQueue work;
+
+ private:
+ inline void add(int fd);
+ inline void remove(int fd);
+
+ void worker();
+
+ typedef std::unique_ptr<SocketHandler> SocketPtr;
+ typedef std::array<SocketPtr, 1024> Sockets;
+ int topSock;
+ Sockets sockets;
+ fd_set rfds, wfds, efds;
+ };
+};
+
+#endif
+
diff --git a/icespider/unittests/Jamfile.jam b/icespider/unittests/Jamfile.jam
index aa8d879..a0382d1 100644
--- a/icespider/unittests/Jamfile.jam
+++ b/icespider/unittests/Jamfile.jam
@@ -105,6 +105,23 @@ run
: testFcgi ;
run
+ testEmbedded.cpp
+ : : :
+ <slicer>yes
+ <define>BOOST_TEST_DYN_LINK
+ <library>testCommon
+ <library>../embedded//icespider-embedded
+ <library>../common//icespider-common
+ <library>../core//icespider-core
+ <implicit-dependency>../core//icespider-core
+ <library>boost_system
+ <library>boost_filesystem
+ <library>slicer
+ <library>slicer-json
+ <library>adhocutil
+ ;
+
+run
testFileSessions.cpp
: :
config/ice.properties
diff --git a/icespider/unittests/testEmbedded.cpp b/icespider/unittests/testEmbedded.cpp
new file mode 100644
index 0000000..ed6b381
--- /dev/null
+++ b/icespider/unittests/testEmbedded.cpp
@@ -0,0 +1,62 @@
+#define BOOST_TEST_MODULE TestEmbedded
+#include <boost/test/unit_test.hpp>
+
+#include <embedded.h>
+
+class EmbeddedIceSpiderInstance : public IceSpider::Embedded::Listener {
+ public:
+ EmbeddedIceSpiderInstance() :
+ fd(listen(8080))
+ {
+ BOOST_REQUIRE_GE(fd, 0);
+ }
+
+ int fd;
+};
+
+class EmbeddedIceSpiderRunner : public EmbeddedIceSpiderInstance {
+ public:
+ EmbeddedIceSpiderRunner() :
+ th([this]{ run(); })
+ {
+ }
+
+ ~EmbeddedIceSpiderRunner()
+ {
+ unlisten(fd);
+ th.join();
+ }
+
+ std::thread th;
+};
+
+BOOST_AUTO_TEST_CASE(construct_destruct_cycle, * boost::unit_test::timeout(1))
+{
+ EmbeddedIceSpiderInstance instance;
+}
+
+BOOST_AUTO_TEST_CASE(startup_and_shutdown_cycle, * boost::unit_test::timeout(2))
+{
+ EmbeddedIceSpiderRunner runner;
+ sleep(1);
+}
+
+BOOST_FIXTURE_TEST_SUITE(EmbeddedIceSpider, EmbeddedIceSpiderRunner);
+
+// Throw some requests at it, get a general performance overview
+BOOST_AUTO_TEST_CASE(quick_siege_test,
+ * boost::unit_test::timeout(5))
+{
+ BOOST_REQUIRE_EQUAL(0, system("wrk -t2 http://localhost:8080/ -c10 -T 15 -d 2"));
+}
+
+// Throw lots of requests at it, get a good performance overview
+BOOST_AUTO_TEST_CASE(simple_performance_test,
+ * boost::unit_test::disabled() // Not usually interested
+ * boost::unit_test::timeout(15))
+{
+ BOOST_REQUIRE_EQUAL(0, system("wrk -t20 http://localhost:8080/ -c100 -T 15"));
+}
+
+BOOST_AUTO_TEST_SUITE_END();
+