#include "muxer.h" #include #include #include #include #include #include using namespace IceTray::Logging; namespace P2PVR { IceTray::Logging::LoggerPtr Muxer::log(LOGMANAGER()->getLogger()); class MuxerFailure : public DataHandlingException { }; Muxer::Muxer(const RawDataClientPrxPtr & t, const std::string & cmd) : target(t) { std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); fds = ProcessPipesPtr(new AdHoc::System::ProcessPipes(params, true, true, true)); fcntl(fds->fdIn(), F_SETFL, O_NONBLOCK); log->messagebf(LogLevel::INFO, "Muxer::%p started with command '%s'", this, cmd); } Muxer::~Muxer() { close(fds->fdIn()); int status; while (waitpid(fds->pid(), &status, WNOHANG) == 0) { ReadMuxerAndSend(5); } log->messagebf(LogLevel::INFO, "Muxer::%p finished with status %d", this, status); } bool Muxer::NewData(const Data data, const Ice::Current &) { std::lock_guard g(lock); for (size_t off = 0; off < data.size(); ) { // Read output until input wouldn't block if (ReadWaiting()) return true; // Send some input auto w = write(fds->fdIn(), &data[off], data.size() - off); if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { log->messagebf(LogLevel::ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); throw MuxerFailure(); } off += w; } // Read anything that's come out return ReadAvailable(); } bool Muxer::ReadWaiting() const { pollfd fd = { fds->fdIn(), POLLOUT, 0 }; while (true) { auto p = poll(&fd, 1, 0); if (p < 0) { // error throw MuxerFailure(); } else if (p == 0) { // write would block if (ReadMuxerAndSend(1)) return true; } else { // write would not block if (ReadMuxerAndSend(0)) return true; break; } } return false; } bool Muxer::ReadAvailable() const { return ReadMuxerAndSend(0); } bool Muxer::ReadMuxerAndSend(int waitTime) const { pollfd fd[2] = { { fds->fdOut(), POLLIN | POLLHUP, 0 }, { fds->fdError(), POLLIN | POLLHUP, 0 } }; while (true) { auto p = poll(fd, 2, waitTime); if (p < 0) { // error throw MuxerFailure(); } else if (p == 0) { // all ok return false; } else { bool closed = false; if (fd[0].revents & (POLLIN | POLLHUP)) { Data buf(BUFSIZ); auto len = read(fds->fdOut(), &buf.front(), buf.size()); if (len == 0) { // ok, proc exit closed = true; } if (len < 0) { // error throw MuxerFailure(); } buf.resize(len); target->NewData(buf); } if (fd[1].revents & (POLLIN | POLLHUP)) { Data buf(BUFSIZ); auto len = read(fds->fdError(), &buf.front(), buf.size()); if (len == 0) { // ok, proc exit closed = true; } if (len < 0) { // error throw MuxerFailure(); } buf.resize(len); std::vector lines; boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); for (const auto & line : lines) { if (line.empty()) continue; log->messagebf(LogLevel::INFO, "Muxer::%p > %s", this, line); } } if (closed) return true; } } } }