diff options
| -rw-r--r-- | p2pvr/lib/muxer.cpp | 134 | ||||
| -rw-r--r-- | p2pvr/lib/muxer.h | 7 | 
2 files changed, 81 insertions, 60 deletions
| diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index bd00a8c..25f6b57 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -1,57 +1,19 @@  #include <pch.hpp>  #include "muxer.h"  #include <logger.h> +#include <misc.h>  #include <poll.h>  #include <sys/wait.h>  #include <boost/algorithm/string/split.hpp> - -pid_t -mpopenrw(const std::vector<std::string> & params, int fds[2]) -{ -	int rpipes[2], wpipes[2]; -	if (pipe(rpipes)) { -		throw std::runtime_error("Failed to create a pipe"); -	} -	if (pipe(wpipes)) { -		throw std::runtime_error("Failed to create another pipe"); -	} -	pid_t child = fork(); -	switch (child) { -		case -1: // fail -			throw std::runtime_error("Failed to fork"); -		default: // parent -			close(wpipes[0]); -			close(rpipes[1]); -			fds[0] = wpipes[1]; -			fds[1] = rpipes[0]; -			break; -		case 0: // in child -			close(wpipes[1]); -			close(rpipes[0]); -			dup2(wpipes[0], 0); -			dup2(rpipes[1], 1); -			for (int n = 3; n < 1024; n += 1) { -				close(n); -			} -			char * buf[100]; -			char ** w = &buf[0]; -			BOOST_FOREACH(const auto & p, params) { -				*w++ = strdup(p.c_str()); -			} -			*w = NULL; -			execv(buf[0], buf); -			abort(); -			break; -	} -	return child; -} +#include <boost/algorithm/string/classification.hpp>  Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :  	target(t)  {  	std::vector<std::string> params;  	boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); -	muxerPid = mpopenrw(params, fds); +	muxerPid = popenrwe(params, fds); +	Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd);  }  Muxer::~Muxer() @@ -61,26 +23,61 @@ Muxer::~Muxer()  	while (waitpid(muxerPid, &status, WNOHANG) == 0) {  		ReadMuxerAndSend(5);  	} +	Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);  	close(fds[1]);  }  bool  Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)  { -	std::lock_guard<std::mutex> g(lock); -	ReadMuxerAndSend(0); -	if (write(fds[0], &data.front(), data.size()) < 1) { +	if (ReadWaiting())  		return true; +	{ +		std::lock_guard<std::mutex> g(wlock); +		if (write(fds[0], &data.front(), data.size()) < (int)data.size()) { +			return true; +		} +	} +	return ReadAvailable(); +} + +bool +Muxer::ReadWaiting() const +{ +	std::lock_guard<std::mutex> g(rlock); +	pollfd fd = { fds[0], POLLOUT, 0 }; +	while (true) { +		auto p = poll(&fd, 1, 0); +		if (p < 0) { +			// error +			return true; +		} +		else if (p == 0) { +			// write would block +			if (ReadMuxerAndSend(1)) return true; +		} +		else { +			// write would not block +			if (ReadMuxerAndSend(0)) return true; +			break; +		}  	} +	return false; +} + +bool +Muxer::ReadAvailable() const +{ +	std::lock_guard<std::mutex> g(rlock);  	return ReadMuxerAndSend(0);  }  bool  Muxer::ReadMuxerAndSend(int waitTime) const  { -	pollfd fd = { fds[1], POLLIN, 0 }; +	pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } };  	while (true) { -		auto p = poll(&fd, 1, waitTime); +		auto p = poll(fd, 2, waitTime);  		if (p < 0) {  			// error  			return true; @@ -89,19 +86,40 @@ Muxer::ReadMuxerAndSend(int waitTime) const  			// all ok  			return false;  		} -		else if (p > 0) { -			P2PVR::Data buf(BUFSIZ); -			auto len = read(fds[1], &buf.front(), buf.size()); -			if (len == 0) { -				// ok, proc exit -				return true; +		else { +			if (fd[0].revents & POLLIN) { +				P2PVR::Data buf(BUFSIZ); +				auto len = read(fds[1], &buf.front(), buf.size()); +				if (len == 0) { +					// ok, proc exit +					return true; +				} +				if (len < 0) { +					// error +					return true; +				} +				buf.resize(len); +				target->NewData(buf);  			} -			if (len < 0) { -				// error -				return true; +			if (fd[1].revents & POLLIN) { +				P2PVR::Data buf(BUFSIZ); +				auto len = read(fds[2], &buf.front(), buf.size()); +				if (len == 0) { +					// ok, proc exit +					return true; +				} +				if (len < 0) { +					// error +					return true; +				} +				buf.resize(len); +				std::vector<std::string> lines; +				boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); +				BOOST_FOREACH(const auto & line, lines) { +					if (line.empty()) continue; +					Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); +				}  			} -			buf.resize(len); -			target->NewData(buf);  		}  	}  } diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index 118b1da..e4789cf 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -12,11 +12,14 @@ class Muxer : public P2PVR::RawDataClient {  		bool NewData(const P2PVR::Data &, const Ice::Current &);  	private: +		bool ReadWaiting() const; +		bool ReadAvailable() const;  		bool ReadMuxerAndSend(int wait) const;  		const P2PVR::RawDataClientPrx target; -		int fds[2]; +		int fds[3];  		pid_t muxerPid; -		std::mutex lock; +		mutable std::mutex wlock; +		mutable std::mutex rlock;  };  #endif | 
