From be09d53501f626e0d0d8d0a1b88a7a7be0a4480d Mon Sep 17 00:00:00 2001 From: randomdan Date: Sun, 19 Jan 2014 21:19:32 +0000 Subject: Fix up pipe handling in the muxer and write stderr of muxer command into the log --- p2pvr/lib/muxer.cpp | 134 +++++++++++++++++++++++++++++----------------------- p2pvr/lib/muxer.h | 7 ++- 2 files changed, 81 insertions(+), 60 deletions(-) diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index bd00a8c..25f6b57 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -1,57 +1,19 @@ #include #include "muxer.h" #include +#include #include #include #include - -pid_t -mpopenrw(const std::vector & params, int fds[2]) -{ - int rpipes[2], wpipes[2]; - if (pipe(rpipes)) { - throw std::runtime_error("Failed to create a pipe"); - } - if (pipe(wpipes)) { - throw std::runtime_error("Failed to create another pipe"); - } - pid_t child = fork(); - switch (child) { - case -1: // fail - throw std::runtime_error("Failed to fork"); - default: // parent - close(wpipes[0]); - close(rpipes[1]); - fds[0] = wpipes[1]; - fds[1] = rpipes[0]; - break; - case 0: // in child - close(wpipes[1]); - close(rpipes[0]); - dup2(wpipes[0], 0); - dup2(rpipes[1], 1); - for (int n = 3; n < 1024; n += 1) { - close(n); - } - char * buf[100]; - char ** w = &buf[0]; - BOOST_FOREACH(const auto & p, params) { - *w++ = strdup(p.c_str()); - } - *w = NULL; - execv(buf[0], buf); - abort(); - break; - } - return child; -} +#include Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); - muxerPid = mpopenrw(params, fds); + muxerPid = popenrwe(params, fds); + Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd); } Muxer::~Muxer() @@ -61,26 +23,61 @@ Muxer::~Muxer() while (waitpid(muxerPid, &status, WNOHANG) == 0) { ReadMuxerAndSend(5); } + Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); close(fds[1]); } bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { - std::lock_guard g(lock); - ReadMuxerAndSend(0); - if (write(fds[0], &data.front(), data.size()) < 1) { + if (ReadWaiting()) return true; + { + std::lock_guard g(wlock); + if (write(fds[0], &data.front(), data.size()) < (int)data.size()) { + return true; + } + } + return ReadAvailable(); +} + +bool +Muxer::ReadWaiting() const +{ + std::lock_guard g(rlock); + pollfd fd = { fds[0], POLLOUT, 0 }; + while (true) { + auto p = poll(&fd, 1, 0); + if (p < 0) { + // error + return true; + } + else if (p == 0) { + // write would block + if (ReadMuxerAndSend(1)) return true; + } + else { + // write would not block + if (ReadMuxerAndSend(0)) return true; + break; + } } + return false; +} + +bool +Muxer::ReadAvailable() const +{ + std::lock_guard g(rlock); return ReadMuxerAndSend(0); } bool Muxer::ReadMuxerAndSend(int waitTime) const { - pollfd fd = { fds[1], POLLIN, 0 }; + pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } }; while (true) { - auto p = poll(&fd, 1, waitTime); + auto p = poll(fd, 2, waitTime); if (p < 0) { // error return true; @@ -89,19 +86,40 @@ Muxer::ReadMuxerAndSend(int waitTime) const // all ok return false; } - else if (p > 0) { - P2PVR::Data buf(BUFSIZ); - auto len = read(fds[1], &buf.front(), buf.size()); - if (len == 0) { - // ok, proc exit - return true; + else { + if (fd[0].revents & POLLIN) { + P2PVR::Data buf(BUFSIZ); + auto len = read(fds[1], &buf.front(), buf.size()); + if (len == 0) { + // ok, proc exit + return true; + } + if (len < 0) { + // error + return true; + } + buf.resize(len); + target->NewData(buf); } - if (len < 0) { - // error - return true; + if (fd[1].revents & POLLIN) { + P2PVR::Data buf(BUFSIZ); + auto len = read(fds[2], &buf.front(), buf.size()); + if (len == 0) { + // ok, proc exit + return true; + } + if (len < 0) { + // error + return true; + } + buf.resize(len); + std::vector lines; + boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); + BOOST_FOREACH(const auto & line, lines) { + if (line.empty()) continue; + Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); + } } - buf.resize(len); - target->NewData(buf); } } } diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index 118b1da..e4789cf 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -12,11 +12,14 @@ class Muxer : public P2PVR::RawDataClient { bool NewData(const P2PVR::Data &, const Ice::Current &); private: + bool ReadWaiting() const; + bool ReadAvailable() const; bool ReadMuxerAndSend(int wait) const; const P2PVR::RawDataClientPrx target; - int fds[2]; + int fds[3]; pid_t muxerPid; - std::mutex lock; + mutable std::mutex wlock; + mutable std::mutex rlock; }; #endif -- cgit v1.2.3