#include #include "muxer.h" #include #include #include #include #include #include #include class MuxerFailure : public P2PVR::DataHandlingException { }; Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); muxerPid = popenrwe(params, fds); fcntl(fds[0], F_SETFL, O_NONBLOCK); Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd); } Muxer::~Muxer() { close(fds[0]); int status; while (waitpid(muxerPid, &status, WNOHANG) == 0) { ReadMuxerAndSend(5); } Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); close(fds[1]); close(fds[2]); } bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { std::lock_guard g(lock); for (size_t off = 0; off < data.size(); ) { // Read output until input wouldn't block if (ReadWaiting()) return true; // Send some input auto w = write(fds[0], &data[off], data.size() - off); if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); throw MuxerFailure(); } off += w; } // Read anything that's come out return ReadAvailable(); } bool Muxer::ReadWaiting() const { pollfd fd = { fds[0], POLLOUT, 0 }; while (true) { auto p = poll(&fd, 1, 0); if (p < 0) { // error throw MuxerFailure(); } else if (p == 0) { // write would block if (ReadMuxerAndSend(1)) return true; } else { // write would not block if (ReadMuxerAndSend(0)) return true; break; } } return false; } bool Muxer::ReadAvailable() const { return ReadMuxerAndSend(0); } bool Muxer::ReadMuxerAndSend(int waitTime) const { pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } }; while (true) { auto p = poll(fd, 2, waitTime); if (p < 0) { // error throw MuxerFailure(); } else if (p == 0) { // all ok return false; } else { bool closed = false; if (fd[0].revents & (POLLIN | POLLHUP)) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[1], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit closed = true; } if (len < 0) { // error throw MuxerFailure(); } buf.resize(len); target->NewData(buf); } if (fd[1].revents & (POLLIN | POLLHUP)) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[2], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit closed = true; } if (len < 0) { // error throw MuxerFailure(); } buf.resize(len); std::vector lines; boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); BOOST_FOREACH(const auto & line, lines) { if (line.empty()) continue; Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); } } if (closed) return true; } } }