diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2018-05-29 19:53:30 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2018-06-04 19:00:28 +0100 |
commit | 140a6858cf8e07b4f65720ba1c24142cc287b700 (patch) | |
tree | 08425d1673a04e546005c6fc20263a18ed5d9039 | |
parent | Remove parameter setters from ihttpRequest (diff) | |
download | icespider-140a6858cf8e07b4f65720ba1c24142cc287b700.tar.bz2 icespider-140a6858cf8e07b4f65720ba1c24142cc287b700.tar.xz icespider-140a6858cf8e07b4f65720ba1c24142cc287b700.zip |
Initial commit of embedded server
First cut of a light-weight multi-threaded HTTP daemon library.
Let's be honest... it does nothing yet, but it's our starting point.
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | icespider/embedded/Jamfile.jam | 19 | ||||
m--------- | icespider/embedded/concurrentqueue | 0 | ||||
-rw-r--r-- | icespider/embedded/embedded.cpp | 240 | ||||
-rw-r--r-- | icespider/embedded/embedded.h | 105 | ||||
-rw-r--r-- | icespider/unittests/Jamfile.jam | 17 | ||||
-rw-r--r-- | icespider/unittests/testEmbedded.cpp | 62 |
7 files changed, 446 insertions, 0 deletions
diff --git a/.gitmodules b/.gitmodules index e0e648c..4322557 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,3 +2,6 @@ path = ice url = https://github.com/zeroc-ice/ice branch = 3.7 +[submodule "icespider/fcgi/concurrentqueue"] + path = icespider/embedded/concurrentqueue + url = https://github.com/cameron314/concurrentqueue/ diff --git a/icespider/embedded/Jamfile.jam b/icespider/embedded/Jamfile.jam new file mode 100644 index 0000000..1c52a5f --- /dev/null +++ b/icespider/embedded/Jamfile.jam @@ -0,0 +1,19 @@ +lib slicer ; +lib fcgi : : <name>fcgi ; +lib fcgi++ : : <name>fcgi++ ; +lib adhocutil : : : : <include>/usr/include/adhocutil/ ; + +lib icespider-embedded : + [ glob-tree *.cpp : bin concurrentqueue ] + : + <library>slicer + <include>concurrentqueue + <library>..//core//icespider-core + <implicit-dependency>../core//icespider-core + : : + <library>..//core//icespider-core + <implicit-dependency>../core//icespider-core + <include>. + <include>concurrentqueue + ; + diff --git a/icespider/embedded/concurrentqueue b/icespider/embedded/concurrentqueue new file mode 160000 +Subproject 8f7e861dd9411a0bf77a6b9de83a47b3424fafb diff --git a/icespider/embedded/embedded.cpp b/icespider/embedded/embedded.cpp new file mode 100644 index 0000000..9b65839 --- /dev/null +++ b/icespider/embedded/embedded.cpp @@ -0,0 +1,240 @@ +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <netdb.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include "embedded.h" +#include <lockHelpers.h> +#include <functional> +#include <future> +#include <boost/assert.hpp> + +static socklen_t clientlen = sizeof(struct sockaddr_in); + +namespace IceSpider::Embedded { + SocketHandler::SocketHandler(int f) : + fd(f) + { + if (fd < 0) { + throw std::runtime_error("Invalid socket"); + } + } + + SocketHandler::~SocketHandler() + { + close(fd); + } + + FdSocketEventResultFuture SocketHandler::returnNow(int fd, const SocketEventResult && ser) + { + std::promise<SocketEventResult> p; + p.set_value(ser); + return { fd, p.get_future() }; + } + + FdSocketEventResultFuture SocketHandler::returnQueued(Listener * listener, int fd, Work && work) + { + auto f = work.get_future(); + BOOST_VERIFY_MSG(listener->work.try_enqueue(std::move(work)), "try_enqueue"); + return { fd, std::move(f) }; + } + + FdSocketEventResultFuture SocketHandler::except(Listener *) + { + return returnNow(fd, FDSetChange::Remove); + } + + ClientSocket::ClientSocket(int pfd) : + SocketHandler(accept(pfd, (struct sockaddr *) &clientaddr, &clientlen)), + buf(BUFSIZ), + rec(0), + state(State::reading_headers) + { + } + + FdSocketEventResultFuture ClientSocket::read(Listener * listener) + { + Work w([this]() { + if (buf.size() == rec) { + buf.resize(rec * 2); + } + auto r = ::read(fd, &buf.at(rec), buf.size() - rec); + if (r < 1) { + return FDSetChange::Remove; + } + switch (state) { + case State::reading_headers: + read_headers(r); + break; + case State::streaming_input: + stream_input(r); + break; + } + return FDSetChange::NoChange; + }); + return returnQueued(listener, fd, std::move(w)); + } + + void ClientSocket::read_headers(int r) + { + buf[rec + r] = 0; + if (strstr(&buf.at(rec > 3 ? rec - 2 : 0), "\r\n")) { + auto w = ::write(fd, "HTTP/1.1 204 No content\r\n\r\n", 27); + (void)w; + rec = 0; + } + rec += r; + } + + void ClientSocket::stream_input(int) + { + } + + ListenSocket::ListenSocket(unsigned short portno) : + SocketHandler(socket(AF_INET, SOCK_STREAM, 0)) + { + int optval = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)); + + bzero(&serveraddr, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); + serveraddr.sin_port = htons(portno); + + if (bind(fd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { + throw std::runtime_error("ERROR on binding"); + } + + if (listen(fd, 5) < 0) { + throw std::runtime_error("ERROR on listen"); + } + } + + FdSocketEventResultFuture ListenSocket::read(Listener * listener) + { + return returnNow(listener->create<ClientSocket>(fd), FDSetChange::AddNew); + } + + Listener::Listener() : + work(1024), + topSock(0) + { + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + } + + Listener::~Listener() + { + } + + int Listener::listen(unsigned short portno) + { + int fd = create<ListenSocket>(portno); + add(fd); + return fd; + } + + void Listener::unlisten(int fd) + { + if (sockets[fd] && dynamic_cast<ListenSocket *>(sockets[fd].get())) { + remove(fd); + } + } + + void Listener::worker() + { + while (topSock > 0) { + try { + SocketHandler::Work w; + if (work.wait_dequeue_timed(w, 500000)) { + w(); + }; + } + catch (std::exception & e) { + } + } + } + + void Listener::run() + { + std::vector<std::thread> workers; + std::vector<FdSocketEventResultFuture> pending; + + for (auto x = std::thread::hardware_concurrency(); x; x--) { + workers.emplace_back(&Listener::worker, this); + } + + while (topSock > 0) { + auto r = rfds, w = wfds, e = efds; + struct timeval to = { 0, pending.empty() ? 500000 : 50 }; + if (auto s = select(topSock, &r, &w, &e, &to) > 0) { + pending.reserve(pending.size() + s); + + for (int fd = 0; fd < topSock; fd++) { + if (FD_ISSET(fd, &r)) { + pending.emplace_back(sockets[fd]->read(this)); + } + else if (FD_ISSET(fd, &e)) { + pending.emplace_back(sockets[fd]->except(this)); + } + } + } + + for (auto p = pending.begin(); p != pending.end(); ) { + auto & [ fd, ser ] = *p; + if (ser.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + auto [ op ] = ser.get(); + switch (op) { + case FDSetChange::NoChange: + FD_SET(fd, &rfds); + break; + case FDSetChange::AddNew: + add(fd); + break; + case FDSetChange::Remove: + remove(fd); + break; + } + p = pending.erase(p); + } + else { + FD_CLR(fd, &rfds); + p++; + } + } + } + + for (auto & t : workers) { + t.join(); + } + } + + template<typename T, typename ... P> inline int Listener::create(const P & ... p) + { + auto s = std::make_unique<T>(p...); + topSock = std::max(s->fd + 1, topSock); + return (sockets[s->fd] = std::move(s))->fd; + } + + void Listener::add(int fd) + { + FD_SET(fd, &rfds); + FD_SET(fd, &efds); + } + + void Listener::remove(int fd) + { + FD_CLR(fd, &rfds); + FD_CLR(fd, &wfds); + FD_CLR(fd, &efds); + sockets[fd].reset(); + while (topSock > 0 && !sockets[topSock - 1]) { + --topSock; + } + } +} + diff --git a/icespider/embedded/embedded.h b/icespider/embedded/embedded.h new file mode 100644 index 0000000..08f280a --- /dev/null +++ b/icespider/embedded/embedded.h @@ -0,0 +1,105 @@ +#ifndef ICESPIDER_EMBEDDED_H +#define ICESPIDER_EMBEDDED_H + +#include <sys/select.h> +#include <netinet/in.h> +#include <visibility.h> +#include <memory> +#include <array> +#include <vector> +#include <future> +#include </usr/include/semaphore.h> +#include <blockingconcurrentqueue.h> + +namespace IceSpider::Embedded { + class Listener; + + enum class FDSetChange { + NoChange, + AddNew, + Remove, + }; + typedef std::tuple<FDSetChange> SocketEventResult; + typedef std::future<SocketEventResult> SocketEventResultFuture; + typedef std::tuple<int, SocketEventResultFuture> FdSocketEventResultFuture; + + class SocketHandler { + public: + typedef std::packaged_task<SocketEventResult()> Work; + + SocketHandler(int f); + ~SocketHandler(); + + static inline FdSocketEventResultFuture returnNow(int, const SocketEventResult &&); + static inline FdSocketEventResultFuture returnQueued(Listener *, int, Work &&); + + virtual FdSocketEventResultFuture read(Listener *) = 0; + virtual FdSocketEventResultFuture except(Listener *); + + const int fd; + }; + + class ClientSocket : public SocketHandler { + public: + ClientSocket(int fd); + + FdSocketEventResultFuture read(Listener * listener) override; + + private: + inline void read_headers(int bytes); + inline void stream_input(int bytes); + + enum class State { + reading_headers, + streaming_input, + }; + + struct sockaddr_in clientaddr; + std::vector<char> buf; + std::size_t rec; + State state; + }; + + class ListenSocket : public SocketHandler { + public: + ListenSocket(unsigned short portno); + + FdSocketEventResultFuture read(Listener * listener) override; + + private: + struct sockaddr_in serveraddr; + }; + + class DLL_PUBLIC Listener { + public: + typedef moodycamel::BlockingConcurrentQueue<SocketHandler::Work> WorkQueue; + + Listener(); + Listener(unsigned short portno); + ~Listener(); + + int listen(unsigned short portno); + void unlisten(int fd); + + void run(); + + template<typename T, typename ... P> inline int create(const P & ... p); + + WorkQueue work; + + private: + inline void add(int fd); + inline void remove(int fd); + + void worker(); + + typedef std::unique_ptr<SocketHandler> SocketPtr; + typedef std::array<SocketPtr, 1024> Sockets; + int topSock; + Sockets sockets; + fd_set rfds, wfds, efds; + }; +}; + +#endif + diff --git a/icespider/unittests/Jamfile.jam b/icespider/unittests/Jamfile.jam index aa8d879..a0382d1 100644 --- a/icespider/unittests/Jamfile.jam +++ b/icespider/unittests/Jamfile.jam @@ -105,6 +105,23 @@ run : testFcgi ; run + testEmbedded.cpp + : : : + <slicer>yes + <define>BOOST_TEST_DYN_LINK + <library>testCommon + <library>../embedded//icespider-embedded + <library>../common//icespider-common + <library>../core//icespider-core + <implicit-dependency>../core//icespider-core + <library>boost_system + <library>boost_filesystem + <library>slicer + <library>slicer-json + <library>adhocutil + ; + +run testFileSessions.cpp : : config/ice.properties diff --git a/icespider/unittests/testEmbedded.cpp b/icespider/unittests/testEmbedded.cpp new file mode 100644 index 0000000..ed6b381 --- /dev/null +++ b/icespider/unittests/testEmbedded.cpp @@ -0,0 +1,62 @@ +#define BOOST_TEST_MODULE TestEmbedded +#include <boost/test/unit_test.hpp> + +#include <embedded.h> + +class EmbeddedIceSpiderInstance : public IceSpider::Embedded::Listener { + public: + EmbeddedIceSpiderInstance() : + fd(listen(8080)) + { + BOOST_REQUIRE_GE(fd, 0); + } + + int fd; +}; + +class EmbeddedIceSpiderRunner : public EmbeddedIceSpiderInstance { + public: + EmbeddedIceSpiderRunner() : + th([this]{ run(); }) + { + } + + ~EmbeddedIceSpiderRunner() + { + unlisten(fd); + th.join(); + } + + std::thread th; +}; + +BOOST_AUTO_TEST_CASE(construct_destruct_cycle, * boost::unit_test::timeout(1)) +{ + EmbeddedIceSpiderInstance instance; +} + +BOOST_AUTO_TEST_CASE(startup_and_shutdown_cycle, * boost::unit_test::timeout(2)) +{ + EmbeddedIceSpiderRunner runner; + sleep(1); +} + +BOOST_FIXTURE_TEST_SUITE(EmbeddedIceSpider, EmbeddedIceSpiderRunner); + +// Throw some requests at it, get a general performance overview +BOOST_AUTO_TEST_CASE(quick_siege_test, + * boost::unit_test::timeout(5)) +{ + BOOST_REQUIRE_EQUAL(0, system("wrk -t2 http://localhost:8080/ -c10 -T 15 -d 2")); +} + +// Throw lots of requests at it, get a good performance overview +BOOST_AUTO_TEST_CASE(simple_performance_test, + * boost::unit_test::disabled() // Not usually interested + * boost::unit_test::timeout(15)) +{ + BOOST_REQUIRE_EQUAL(0, system("wrk -t20 http://localhost:8080/ -c100 -T 15")); +} + +BOOST_AUTO_TEST_SUITE_END(); + |