diff options
| -rw-r--r-- | p2pvr/daemon/muxedFileSink.cpp | 129 | ||||
| -rw-r--r-- | p2pvr/daemon/muxedFileSink.h | 25 | ||||
| -rw-r--r-- | p2pvr/daemon/recorder.cpp | 19 | ||||
| -rw-r--r-- | p2pvr/daemon/storage.cpp | 23 | ||||
| -rw-r--r-- | p2pvr/daemon/storage.h | 1 | 
5 files changed, 185 insertions, 12 deletions
diff --git a/p2pvr/daemon/muxedFileSink.cpp b/p2pvr/daemon/muxedFileSink.cpp new file mode 100644 index 0000000..4245873 --- /dev/null +++ b/p2pvr/daemon/muxedFileSink.cpp @@ -0,0 +1,129 @@ +#include <pch.hpp> +#include "muxedFileSink.h" +#include <logger.h> +#include <misc.h> +#include <poll.h> +#include <sys/wait.h> +#include <fcntl.h> +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string/classification.hpp> + +class MuxerFailure : public P2PVR::DataHandlingException { }; + +MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd) +{ +	std::vector<std::string> params; +	boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); +	BOOST_FOREACH(auto & c, params) { +		if (c == "${TARGET}") { +			c = t.string(); +		} +	} +	muxerPid = popenrwe(params, fds); +	fcntl(fds[0], F_SETFL, O_NONBLOCK); +	Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t); +} + +MuxedFileSink::~MuxedFileSink() +{ +	close(fds[0]); +	int status; +	while (waitpid(muxerPid, &status, WNOHANG) == 0) { +		ReadMuxer(5); +	} +	Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); +	close(fds[1]); +	close(fds[2]); +} + +bool +MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &) +{ +	std::lock_guard<std::mutex> g(lock); +	for (size_t off = 0; off < data.size(); ) { +		// Read output until input wouldn't block +		if (ReadWaiting()) +			return true; +		// Send some input +		auto w = write(fds[0], &data[off], data.size() - off); +		if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { +			Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); +			throw MuxerFailure(); +		} +		off += w; +	} +	// Read anything that's come out +	return ReadAvailable(); +} + +bool +MuxedFileSink::ReadWaiting() const +{ +	pollfd fd = { fds[0], POLLOUT, 0 }; +	while (true) { +		auto p = poll(&fd, 1, 0); +		if (p < 0) { +			// error +			throw MuxerFailure(); +		} +		else if (p == 0) { +			// write would block +			if (ReadMuxer(1)) return true; +		} +		else { +			// write would not block +			if (ReadMuxer(0)) return true; +			break; +		} +	} +	return false; +} + +bool +MuxedFileSink::ReadAvailable() const +{ +	return ReadMuxer(0); +} + +bool +MuxedFileSink::ReadMuxer(int waitTime) const +{ +	pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } }; +	while (true) { +		auto p = poll(fd, 2, waitTime); +		if (p < 0) { +			// error +			throw MuxerFailure(); +		} +		else if (p == 0) { +			// all ok +			return false; +		} +		else { +			bool closed = false; +			for (int i = 0; i < 2; ++i) { +				if (fd[i].revents & (POLLIN | POLLHUP)) { +					P2PVR::Data buf(BUFSIZ); +					auto len = read(fds[i + 1], &buf.front(), buf.size()); +					if (len == 0) { +						// ok, proc exit +						closed = true; +					} +					if (len < 0) { +						// error +						throw MuxerFailure(); +					} +					buf.resize(len); +					std::vector<std::string> lines; +					boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); +					BOOST_FOREACH(const auto & line, lines) { +						if (line.empty()) continue; +						Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); +					} +				} +			} +			if (closed) return true; +		} +	} +} + diff --git a/p2pvr/daemon/muxedFileSink.h b/p2pvr/daemon/muxedFileSink.h new file mode 100644 index 0000000..9cca6fa --- /dev/null +++ b/p2pvr/daemon/muxedFileSink.h @@ -0,0 +1,25 @@ +#ifndef MUXEDFILESINK_H +#define MUXEDFILESINK_H + +#include <dvb.h> +#include <mutex> +#include <boost/filesystem/path.hpp> + +class MuxedFileSink : public P2PVR::RawDataClient { +	public: +		MuxedFileSink(const boost::filesystem::path & target, const std::string & cmd); +		~MuxedFileSink(); + +		bool NewData(const P2PVR::Data &, const Ice::Current &); + +	private: +		bool ReadWaiting() const; +		bool ReadAvailable() const; +		bool ReadMuxer(int wait) const; +		int fds[3]; +		pid_t muxerPid; +		mutable std::mutex lock; +}; + +#endif + diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp index ccb975e..6cd0b5f 100644 --- a/p2pvr/daemon/recorder.cpp +++ b/p2pvr/daemon/recorder.cpp @@ -13,8 +13,8 @@ std::string Recorder::extension;  std::string Recorder::muxerCommand;  DECLARE_OPTIONS(Recorder, "P2PVR Recorder options") -("p2pvr.recorder.extension", Options::value(&extension, "mpg"), - "File extension to save with (default: avi)") +("p2pvr.recorder.extension", Options::value(&extension, "mp4"), + "File extension to save with (default: mp4)")  ("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"),   "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')")  END_OPTIONS(Recorder); @@ -65,9 +65,14 @@ Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service,  	auto id = storage->CreateForEventRecording(extension, schedule, service, event);  	auto store = storage->OpenForWrite(id);  	ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); -	auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); -	ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); }); -	auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter)); +	auto target = store; +	P2PVR::RawDataClientPrx muxer; +	if (!muxerCommand.empty()) { +		muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); +		target = muxer; +	} +	ScopeObject _muxer(NULL, NULL, [this,&muxer]() { if (muxer) adapter->remove(muxer->ice_getIdentity()); }); +	auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, target, adapter->getCommunicator(), adapter));  	ss->Start();  	Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)", @@ -91,7 +96,9 @@ Recorder::StopRecording(CurrentPtr c)  	std::lock_guard<std::mutex> g(lock);  	Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title);  	c->stream->Stop(); -	adapter->remove(c->muxer->ice_getIdentity()); +	if (c->muxer) { +		adapter->remove(c->muxer->ice_getIdentity()); +	}  	currentRecordings.erase(c);  	auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));  	storage->Close(c->store); diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp index 79280a1..2465afe 100644 --- a/p2pvr/daemon/storage.cpp +++ b/p2pvr/daemon/storage.cpp @@ -1,6 +1,7 @@  #include <pch.hpp>  #include "storage.h"  #include "fileSink.h" +#include "muxedFileSink.h"  #include <commonHelpers.h>  #include <fcntl.h>  #include <logger.h> @@ -11,6 +12,7 @@  namespace fs = boost::filesystem; +std::string Storage::muxerCommand;  fs::path Storage::root;  fs::path Storage::byAll;  fs::path Storage::byTitle; @@ -19,6 +21,8 @@ fs::path Storage::byService;  fs::path Storage::bySchedule;  DECLARE_OPTIONS(Storage, "P2PVR Storage options") +("p2pvr.storage.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}"), + "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}')")  ("p2pvr.storage.root", Options::value(&root, "recordings"),   "Root folder in which to store recordings")  ("p2pvr.storage.all", Options::value(&byAll, "all"), @@ -223,13 +227,20 @@ Storage::OpenForWrite(const std::string & id, const Ice::Current & ice)  {  	fs::path path = root / byAll / id;  	path.replace_extension(FindExtension(id)); -	auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE); -	if (fd < 0) { -		Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, -				path, errno, strerror(errno)); -		throw P2PVR::StorageException(path.string(), strerror(errno)); +	P2PVR::RawDataClient * target; +	if (muxerCommand.empty()) { +		auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE); +		if (fd < 0) { +			Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, +					path, errno, strerror(errno)); +			throw P2PVR::StorageException(path.string(), strerror(errno)); +		} +		target = new FileSink(fd); +	} +	else { +		target = new MuxedFileSink(path, muxerCommand);  	} -	auto openFile = OpenFilePtr(new OpenFile(ice.adapter, new FileSink(fd))); +	auto openFile = OpenFilePtr(new OpenFile(ice.adapter, target));  	openFiles.insert(openFile);  	return *openFile;  } diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h index 86bc76a..89a2836 100644 --- a/p2pvr/daemon/storage.h +++ b/p2pvr/daemon/storage.h @@ -29,6 +29,7 @@ class Storage : public P2PVR::Storage {  		typedef std::set<OpenFilePtr> OpenFiles;  		OpenFiles openFiles; +		static std::string muxerCommand;  		static boost::filesystem::path root;  		static boost::filesystem::path byAll;  		static boost::filesystem::path byTitle;  | 
