#include #include "muxer.h" #include #include #include #include #include #include Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); muxerPid = popenrwe(params, fds); Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd); } Muxer::~Muxer() { close(fds[0]); int status; while (waitpid(muxerPid, &status, WNOHANG) == 0) { ReadMuxerAndSend(5); } Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); close(fds[1]); } bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { if (ReadWaiting()) return true; { std::lock_guard g(wlock); if (write(fds[0], &data.front(), data.size()) < (int)data.size()) { return true; } } return ReadAvailable(); } bool Muxer::ReadWaiting() const { std::lock_guard g(rlock); pollfd fd = { fds[0], POLLOUT, 0 }; while (true) { auto p = poll(&fd, 1, 0); if (p < 0) { // error return true; } else if (p == 0) { // write would block if (ReadMuxerAndSend(1)) return true; } else { // write would not block if (ReadMuxerAndSend(0)) return true; break; } } return false; } bool Muxer::ReadAvailable() const { std::lock_guard g(rlock); return ReadMuxerAndSend(0); } bool Muxer::ReadMuxerAndSend(int waitTime) const { pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } }; while (true) { auto p = poll(fd, 2, waitTime); if (p < 0) { // error return true; } else if (p == 0) { // all ok return false; } else { if (fd[0].revents & POLLIN) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[1], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit return true; } if (len < 0) { // error return true; } buf.resize(len); target->NewData(buf); } if (fd[1].revents & POLLIN) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[2], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit return true; } if (len < 0) { // error return true; } buf.resize(len); std::vector lines; boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); BOOST_FOREACH(const auto & line, lines) { if (line.empty()) continue; Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); } } } } }