#include #include "muxer.h" #include #include #include #include pid_t mpopenrw(const std::vector & params, int fds[2]) { int rpipes[2], wpipes[2]; if (pipe(rpipes)) { throw std::runtime_error("Failed to create a pipe"); } if (pipe(wpipes)) { throw std::runtime_error("Failed to create another pipe"); } pid_t child = fork(); switch (child) { case -1: // fail throw std::runtime_error("Failed to fork"); default: // parent close(wpipes[0]); close(rpipes[1]); fds[0] = wpipes[1]; fds[1] = rpipes[0]; break; case 0: // in child close(wpipes[1]); close(rpipes[0]); dup2(wpipes[0], 0); dup2(rpipes[1], 1); for (int n = 3; n < 1024; n += 1) { close(n); } char * buf[100]; char ** w = &buf[0]; BOOST_FOREACH(const auto & p, params) { *w++ = strdup(p.c_str()); } *w = NULL; execv(buf[0], buf); abort(); break; } return child; } Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); muxerPid = mpopenrw(params, fds); } Muxer::~Muxer() { close(fds[0]); int status; while (waitpid(muxerPid, &status, WNOHANG) == 0) { ReadMuxerAndSend(5); } close(fds[1]); } bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { std::lock_guard g(lock); ReadMuxerAndSend(0); if (write(fds[0], &data.front(), data.size()) < 1) { return true; } return ReadMuxerAndSend(0); } bool Muxer::ReadMuxerAndSend(int waitTime) const { pollfd fd = { fds[1], POLLIN, 0 }; while (true) { auto p = poll(&fd, 1, waitTime); if (p < 0) { // error return true; } else if (p == 0) { // all ok return false; } else if (p > 0) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[1], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit return true; } if (len < 0) { // error return true; } buf.resize(len); target->NewData(buf); } } }