diff options
-rw-r--r-- | src/ingestor.cpp | 17 | ||||
-rw-r--r-- | src/ingestor.hpp | 2 | ||||
-rw-r--r-- | src/uaLookup.hpp | 1 |
3 files changed, 16 insertions, 4 deletions
diff --git a/src/ingestor.cpp b/src/ingestor.cpp index 87167b5..6dd369e 100644 --- a/src/ingestor.cpp +++ b/src/ingestor.cpp @@ -58,7 +58,7 @@ namespace WebStat { } Ingestor::Ingestor(const utsname & host, DB::ConnectionPoolPtr dbpl) : - hostnameId {crc32(host.nodename)}, dbpool {std::move(dbpl)} + hostnameId {crc32(host.nodename)}, dbpool {std::move(dbpl)}, curl {curl_multi_init()} { auto dbconn = dbpool->get(); auto ins = dbconn->modify(SQL::HOST_UPSERT, SQL::HOST_UPSERT_OPTS); @@ -89,9 +89,18 @@ namespace WebStat { void Ingestor::ingestLog(std::FILE * input) { - while (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { - linesRead++; - ingestLogLine(dbpool->get().get(), line->value()); + curl_waitfd logIn {.fd = fileno(input), .events = CURL_WAIT_POLLIN, .revents = 0}; + + while (curl_multi_poll(curl.get(), &logIn, 1, INT_MAX, nullptr) == CURLM_OK) { + if (logIn.revents) { + if (auto line = scn::scan<std::string>(input, "{:[^\n]}\n")) { + linesRead++; + ingestLogLine(dbpool->get().get(), line->value()); + } + else { + break; + } + } } } diff --git a/src/ingestor.hpp b/src/ingestor.hpp index b2a0803..d9d49a3 100644 --- a/src/ingestor.hpp +++ b/src/ingestor.hpp @@ -1,6 +1,7 @@ #pragma once #include "logTypes.hpp" +#include "uaLookup.hpp" #include <c++11Helpers.h> #include <connectionPool.h> #include <connection_fwd.h> @@ -44,5 +45,6 @@ namespace WebStat { mutable std::flat_set<Crc32Value> existingEntities; uint32_t hostnameId; DB::ConnectionPoolPtr dbpool; + CurlMultiPtr curl; }; } diff --git a/src/uaLookup.hpp b/src/uaLookup.hpp index 843283e..3c43371 100644 --- a/src/uaLookup.hpp +++ b/src/uaLookup.hpp @@ -17,6 +17,7 @@ namespace WebStat { using CurlPtr = std::unique_ptr<CURL, DeleteWith<&curl_easy_cleanup>>; using CurlMimePtr = std::unique_ptr<curl_mime, DeleteWith<&curl_mime_free>>; using CurlErrorBuf = std::array<char, CURL_ERROR_SIZE>; + using CurlMultiPtr = std::unique_ptr<CURLM, DeleteWith<&curl_multi_cleanup>>; class CurlOperation { public: |