summaryrefslogtreecommitdiff
path: root/libadhocutil/curlMultiHandle.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'libadhocutil/curlMultiHandle.cpp')
-rw-r--r--libadhocutil/curlMultiHandle.cpp92
1 files changed, 92 insertions, 0 deletions
diff --git a/libadhocutil/curlMultiHandle.cpp b/libadhocutil/curlMultiHandle.cpp
new file mode 100644
index 0000000..ff509ce
--- /dev/null
+++ b/libadhocutil/curlMultiHandle.cpp
@@ -0,0 +1,92 @@
+#include "curlMultiHandle.h"
+#include <boost/iostreams/stream.hpp>
+#include <map>
+#include "runtimeContext.h"
+#include "curlStream.h"
+
+class RunningCurl : public CurlStreamSource {
+ public:
+ RunningCurl(const std::string & url, const boost::function<void(std::istream &)> & c) :
+ CurlStreamSource(url),
+ consumer(c)
+ {
+ }
+
+ void Callback() override
+ {
+ typedef boost::reference_wrapper<RunningCurl> rc_ref;
+ boost::iostreams::stream<rc_ref> curlstrm(boost::ref(*this));
+ consumer(curlstrm);
+ }
+
+ private:
+ const boost::function<void(std::istream &)> consumer;
+};
+
+CurlMultiHandle::CurlMultiHandle()
+{
+}
+
+CurlMultiHandle::~CurlMultiHandle()
+{
+}
+
+CurlHandlePtr
+CurlMultiHandle::addCurl(const std::string & url, const boost::function<void(std::istream &)> & c)
+{
+ RunningCurl * css = new RunningCurl(url, c);
+ curls.insert(css);
+ return css;
+}
+
+void
+CurlMultiHandle::addRunner(CURLM * curlm, Running & running, CurlMultiHandle::CURLs & curls)
+{
+ auto runner = *curls.begin();
+ curl_multi_add_handle(curlm, *runner);
+ running[*runner] = runner;
+ runner->SwapContext();
+ curls.erase(runner);
+}
+
+void
+CurlMultiHandle::performAll()
+{
+ if (!curls.empty()) {
+ Running running;
+ CURLM * curlm = curl_multi_init();
+
+ while (!curls.empty() && running.size() < 5) {
+ addRunner(curlm, running, curls);
+ }
+ CURLMcode code;
+ int act = running.size();
+ while (act) {
+ while ((code = curl_multi_perform(curlm, &act)) == CURLM_CALL_MULTI_PERFORM) ;
+ // Has anything finished
+ CURLMsg * msg;
+ int msgs = 0;
+ while ((msg = curl_multi_info_read(curlm, &msgs))) {
+ if (msg->msg == CURLMSG_DONE) {
+ curl_multi_remove_handle(curlm, msg->easy_handle);
+ running.erase(msg->easy_handle);
+ if (!curls.empty()) {
+ addRunner(curlm, running, curls);
+ act += 1;
+ }
+ }
+ }
+ // Wait for something to happen
+ fd_set r, w, e;
+ int maxfd = 0;
+ struct timeval to = { 0, 100000 };
+ FD_ZERO(&r);
+ FD_ZERO(&w);
+ FD_ZERO(&e);
+ curl_multi_fdset(curlm, &r, &w, &e, &maxfd);
+ select(act, &r, &w, &e, &to);
+ }
+ curl_multi_cleanup(curlm);
+ }
+}
+