#include <pch.hpp>
#include "muxer.h"
#include <logger.h>
#include <misc.h>
#include <poll.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>

class MuxerFailure : public P2PVR::DataHandlingException { };

Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :
	target(t)
{
	std::vector<std::string> params;
	boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
	muxerPid = popenrwe(params, fds);
	fcntl(fds[0], F_SETFL, O_NONBLOCK);
	Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd);
}

Muxer::~Muxer()
{
	close(fds[0]);
	int status;
	while (waitpid(muxerPid, &status, WNOHANG) == 0) {
		ReadMuxerAndSend(5);
	}
	Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);
	close(fds[1]);
	close(fds[2]);
}

bool
Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)
{
	std::lock_guard<std::mutex> g(lock);
	for (size_t off = 0; off < data.size(); ) {
		// Read output until input wouldn't block
		if (ReadWaiting())
			return true;
		// Send some input
		auto w = write(fds[0], &data[off], data.size() - off);
		if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
			Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
			throw MuxerFailure();
		}
		off += w;
	}
	// Read anything that's come out
	return ReadAvailable();
}

bool
Muxer::ReadWaiting() const
{
	pollfd fd = { fds[0], POLLOUT, 0 };
	while (true) {
		auto p = poll(&fd, 1, 0);
		if (p < 0) {
			// error
			throw MuxerFailure();
		}
		else if (p == 0) {
			// write would block
			if (ReadMuxerAndSend(1)) return true;
		}
		else {
			// write would not block
			if (ReadMuxerAndSend(0)) return true;
			break;
		}
	}
	return false;
}

bool
Muxer::ReadAvailable() const
{
	return ReadMuxerAndSend(0);
}

bool
Muxer::ReadMuxerAndSend(int waitTime) const
{
	pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } };
	while (true) {
		auto p = poll(fd, 2, waitTime);
		if (p < 0) {
			// error
			throw MuxerFailure();
		}
		else if (p == 0) {
			// all ok
			return false;
		}
		else {
			bool closed = false;
			if (fd[0].revents & (POLLIN | POLLHUP)) {
				P2PVR::Data buf(BUFSIZ);
				auto len = read(fds[1], &buf.front(), buf.size());
				if (len == 0) {
					// ok, proc exit
					closed = true;
				}
				if (len < 0) {
					// error
					throw MuxerFailure();
				}
				buf.resize(len);
				target->NewData(buf);
			}
			if (fd[1].revents & (POLLIN | POLLHUP)) {
				P2PVR::Data buf(BUFSIZ);
				auto len = read(fds[2], &buf.front(), buf.size());
				if (len == 0) {
					// ok, proc exit
					closed = true;
				}
				if (len < 0) {
					// error
					throw MuxerFailure();
				}
				buf.resize(len);
				std::vector<std::string> lines;
				boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
				BOOST_FOREACH(const auto & line, lines) {
					if (line.empty()) continue;
					Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);
				}
			}
			if (closed) return true;
		}
	}
}