diff options
author | randomdan <randomdan@localhost> | 2014-08-05 19:09:44 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2014-08-05 19:09:44 +0000 |
commit | 248383b933b6aa31fbc401260bb87de4ea6a685a (patch) | |
tree | 419530d73d25d6b47e758b1225eaea9e7fc3e15d | |
parent | More friendly linkflags and some ycm configs (diff) | |
download | p2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.tar.bz2 p2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.tar.xz p2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.zip |
Support muxing directly in storage
-rw-r--r-- | p2pvr/daemon/muxedFileSink.cpp | 129 | ||||
-rw-r--r-- | p2pvr/daemon/muxedFileSink.h | 25 | ||||
-rw-r--r-- | p2pvr/daemon/recorder.cpp | 19 | ||||
-rw-r--r-- | p2pvr/daemon/storage.cpp | 23 | ||||
-rw-r--r-- | p2pvr/daemon/storage.h | 1 |
5 files changed, 185 insertions, 12 deletions
diff --git a/p2pvr/daemon/muxedFileSink.cpp b/p2pvr/daemon/muxedFileSink.cpp new file mode 100644 index 0000000..4245873 --- /dev/null +++ b/p2pvr/daemon/muxedFileSink.cpp @@ -0,0 +1,129 @@ +#include <pch.hpp> +#include "muxedFileSink.h" +#include <logger.h> +#include <misc.h> +#include <poll.h> +#include <sys/wait.h> +#include <fcntl.h> +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string/classification.hpp> + +class MuxerFailure : public P2PVR::DataHandlingException { }; + +MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd) +{ + std::vector<std::string> params; + boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); + BOOST_FOREACH(auto & c, params) { + if (c == "${TARGET}") { + c = t.string(); + } + } + muxerPid = popenrwe(params, fds); + fcntl(fds[0], F_SETFL, O_NONBLOCK); + Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t); +} + +MuxedFileSink::~MuxedFileSink() +{ + close(fds[0]); + int status; + while (waitpid(muxerPid, &status, WNOHANG) == 0) { + ReadMuxer(5); + } + Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); + close(fds[1]); + close(fds[2]); +} + +bool +MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &) +{ + std::lock_guard<std::mutex> g(lock); + for (size_t off = 0; off < data.size(); ) { + // Read output until input wouldn't block + if (ReadWaiting()) + return true; + // Send some input + auto w = write(fds[0], &data[off], data.size() - off); + if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); + throw MuxerFailure(); + } + off += w; + } + // Read anything that's come out + return ReadAvailable(); +} + +bool +MuxedFileSink::ReadWaiting() const +{ + pollfd fd = { fds[0], POLLOUT, 0 }; + while (true) { + auto p = poll(&fd, 1, 0); + if (p < 0) { + // error + throw MuxerFailure(); + } + else if (p == 0) { + // write would block + if (ReadMuxer(1)) return true; + } + else { + // write would not block + if (ReadMuxer(0)) return true; + break; + } + } + return false; +} + +bool +MuxedFileSink::ReadAvailable() const +{ + return ReadMuxer(0); +} + +bool +MuxedFileSink::ReadMuxer(int waitTime) const +{ + pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } }; + while (true) { + auto p = poll(fd, 2, waitTime); + if (p < 0) { + // error + throw MuxerFailure(); + } + else if (p == 0) { + // all ok + return false; + } + else { + bool closed = false; + for (int i = 0; i < 2; ++i) { + if (fd[i].revents & (POLLIN | POLLHUP)) { + P2PVR::Data buf(BUFSIZ); + auto len = read(fds[i + 1], &buf.front(), buf.size()); + if (len == 0) { + // ok, proc exit + closed = true; + } + if (len < 0) { + // error + throw MuxerFailure(); + } + buf.resize(len); + std::vector<std::string> lines; + boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on); + BOOST_FOREACH(const auto & line, lines) { + if (line.empty()) continue; + Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); + } + } + } + if (closed) return true; + } + } +} + diff --git a/p2pvr/daemon/muxedFileSink.h b/p2pvr/daemon/muxedFileSink.h new file mode 100644 index 0000000..9cca6fa --- /dev/null +++ b/p2pvr/daemon/muxedFileSink.h @@ -0,0 +1,25 @@ +#ifndef MUXEDFILESINK_H +#define MUXEDFILESINK_H + +#include <dvb.h> +#include <mutex> +#include <boost/filesystem/path.hpp> + +class MuxedFileSink : public P2PVR::RawDataClient { + public: + MuxedFileSink(const boost::filesystem::path & target, const std::string & cmd); + ~MuxedFileSink(); + + bool NewData(const P2PVR::Data &, const Ice::Current &); + + private: + bool ReadWaiting() const; + bool ReadAvailable() const; + bool ReadMuxer(int wait) const; + int fds[3]; + pid_t muxerPid; + mutable std::mutex lock; +}; + +#endif + diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp index ccb975e..6cd0b5f 100644 --- a/p2pvr/daemon/recorder.cpp +++ b/p2pvr/daemon/recorder.cpp @@ -13,8 +13,8 @@ std::string Recorder::extension; std::string Recorder::muxerCommand; DECLARE_OPTIONS(Recorder, "P2PVR Recorder options") -("p2pvr.recorder.extension", Options::value(&extension, "mpg"), - "File extension to save with (default: avi)") +("p2pvr.recorder.extension", Options::value(&extension, "mp4"), + "File extension to save with (default: mp4)") ("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"), "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')") END_OPTIONS(Recorder); @@ -65,9 +65,14 @@ Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, auto id = storage->CreateForEventRecording(extension, schedule, service, event); auto store = storage->OpenForWrite(id); ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); - auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); - ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); }); - auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter)); + auto target = store; + P2PVR::RawDataClientPrx muxer; + if (!muxerCommand.empty()) { + muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); + target = muxer; + } + ScopeObject _muxer(NULL, NULL, [this,&muxer]() { if (muxer) adapter->remove(muxer->ice_getIdentity()); }); + auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, target, adapter->getCommunicator(), adapter)); ss->Start(); Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)", @@ -91,7 +96,9 @@ Recorder::StopRecording(CurrentPtr c) std::lock_guard<std::mutex> g(lock); Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title); c->stream->Stop(); - adapter->remove(c->muxer->ice_getIdentity()); + if (c->muxer) { + adapter->remove(c->muxer->ice_getIdentity()); + } currentRecordings.erase(c); auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); storage->Close(c->store); diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp index 79280a1..2465afe 100644 --- a/p2pvr/daemon/storage.cpp +++ b/p2pvr/daemon/storage.cpp @@ -1,6 +1,7 @@ #include <pch.hpp> #include "storage.h" #include "fileSink.h" +#include "muxedFileSink.h" #include <commonHelpers.h> #include <fcntl.h> #include <logger.h> @@ -11,6 +12,7 @@ namespace fs = boost::filesystem; +std::string Storage::muxerCommand; fs::path Storage::root; fs::path Storage::byAll; fs::path Storage::byTitle; @@ -19,6 +21,8 @@ fs::path Storage::byService; fs::path Storage::bySchedule; DECLARE_OPTIONS(Storage, "P2PVR Storage options") +("p2pvr.storage.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}"), + "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}')") ("p2pvr.storage.root", Options::value(&root, "recordings"), "Root folder in which to store recordings") ("p2pvr.storage.all", Options::value(&byAll, "all"), @@ -223,13 +227,20 @@ Storage::OpenForWrite(const std::string & id, const Ice::Current & ice) { fs::path path = root / byAll / id; path.replace_extension(FindExtension(id)); - auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE); - if (fd < 0) { - Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, - path, errno, strerror(errno)); - throw P2PVR::StorageException(path.string(), strerror(errno)); + P2PVR::RawDataClient * target; + if (muxerCommand.empty()) { + auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE); + if (fd < 0) { + Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, + path, errno, strerror(errno)); + throw P2PVR::StorageException(path.string(), strerror(errno)); + } + target = new FileSink(fd); + } + else { + target = new MuxedFileSink(path, muxerCommand); } - auto openFile = OpenFilePtr(new OpenFile(ice.adapter, new FileSink(fd))); + auto openFile = OpenFilePtr(new OpenFile(ice.adapter, target)); openFiles.insert(openFile); return *openFile; } diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h index 86bc76a..89a2836 100644 --- a/p2pvr/daemon/storage.h +++ b/p2pvr/daemon/storage.h @@ -29,6 +29,7 @@ class Storage : public P2PVR::Storage { typedef std::set<OpenFilePtr> OpenFiles; OpenFiles openFiles; + static std::string muxerCommand; static boost::filesystem::path root; static boost::filesystem::path byAll; static boost::filesystem::path byTitle; |