summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-08-05 19:09:44 +0000
committerrandomdan <randomdan@localhost>2014-08-05 19:09:44 +0000
commit248383b933b6aa31fbc401260bb87de4ea6a685a (patch)
tree419530d73d25d6b47e758b1225eaea9e7fc3e15d
parentMore friendly linkflags and some ycm configs (diff)
downloadp2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.tar.bz2
p2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.tar.xz
p2pvr-248383b933b6aa31fbc401260bb87de4ea6a685a.zip
Support muxing directly in storage
-rw-r--r--p2pvr/daemon/muxedFileSink.cpp129
-rw-r--r--p2pvr/daemon/muxedFileSink.h25
-rw-r--r--p2pvr/daemon/recorder.cpp19
-rw-r--r--p2pvr/daemon/storage.cpp23
-rw-r--r--p2pvr/daemon/storage.h1
5 files changed, 185 insertions, 12 deletions
diff --git a/p2pvr/daemon/muxedFileSink.cpp b/p2pvr/daemon/muxedFileSink.cpp
new file mode 100644
index 0000000..4245873
--- /dev/null
+++ b/p2pvr/daemon/muxedFileSink.cpp
@@ -0,0 +1,129 @@
+#include <pch.hpp>
+#include "muxedFileSink.h"
+#include <logger.h>
+#include <misc.h>
+#include <poll.h>
+#include <sys/wait.h>
+#include <fcntl.h>
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/classification.hpp>
+
+class MuxerFailure : public P2PVR::DataHandlingException { };
+
+MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd)
+{
+ std::vector<std::string> params;
+ boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
+ BOOST_FOREACH(auto & c, params) {
+ if (c == "${TARGET}") {
+ c = t.string();
+ }
+ }
+ muxerPid = popenrwe(params, fds);
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t);
+}
+
+MuxedFileSink::~MuxedFileSink()
+{
+ close(fds[0]);
+ int status;
+ while (waitpid(muxerPid, &status, WNOHANG) == 0) {
+ ReadMuxer(5);
+ }
+ Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);
+ close(fds[1]);
+ close(fds[2]);
+}
+
+bool
+MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &)
+{
+ std::lock_guard<std::mutex> g(lock);
+ for (size_t off = 0; off < data.size(); ) {
+ // Read output until input wouldn't block
+ if (ReadWaiting())
+ return true;
+ // Send some input
+ auto w = write(fds[0], &data[off], data.size() - off);
+ if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
+ throw MuxerFailure();
+ }
+ off += w;
+ }
+ // Read anything that's come out
+ return ReadAvailable();
+}
+
+bool
+MuxedFileSink::ReadWaiting() const
+{
+ pollfd fd = { fds[0], POLLOUT, 0 };
+ while (true) {
+ auto p = poll(&fd, 1, 0);
+ if (p < 0) {
+ // error
+ throw MuxerFailure();
+ }
+ else if (p == 0) {
+ // write would block
+ if (ReadMuxer(1)) return true;
+ }
+ else {
+ // write would not block
+ if (ReadMuxer(0)) return true;
+ break;
+ }
+ }
+ return false;
+}
+
+bool
+MuxedFileSink::ReadAvailable() const
+{
+ return ReadMuxer(0);
+}
+
+bool
+MuxedFileSink::ReadMuxer(int waitTime) const
+{
+ pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } };
+ while (true) {
+ auto p = poll(fd, 2, waitTime);
+ if (p < 0) {
+ // error
+ throw MuxerFailure();
+ }
+ else if (p == 0) {
+ // all ok
+ return false;
+ }
+ else {
+ bool closed = false;
+ for (int i = 0; i < 2; ++i) {
+ if (fd[i].revents & (POLLIN | POLLHUP)) {
+ P2PVR::Data buf(BUFSIZ);
+ auto len = read(fds[i + 1], &buf.front(), buf.size());
+ if (len == 0) {
+ // ok, proc exit
+ closed = true;
+ }
+ if (len < 0) {
+ // error
+ throw MuxerFailure();
+ }
+ buf.resize(len);
+ std::vector<std::string> lines;
+ boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
+ BOOST_FOREACH(const auto & line, lines) {
+ if (line.empty()) continue;
+ Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);
+ }
+ }
+ }
+ if (closed) return true;
+ }
+ }
+}
+
diff --git a/p2pvr/daemon/muxedFileSink.h b/p2pvr/daemon/muxedFileSink.h
new file mode 100644
index 0000000..9cca6fa
--- /dev/null
+++ b/p2pvr/daemon/muxedFileSink.h
@@ -0,0 +1,25 @@
+#ifndef MUXEDFILESINK_H
+#define MUXEDFILESINK_H
+
+#include <dvb.h>
+#include <mutex>
+#include <boost/filesystem/path.hpp>
+
+class MuxedFileSink : public P2PVR::RawDataClient {
+ public:
+ MuxedFileSink(const boost::filesystem::path & target, const std::string & cmd);
+ ~MuxedFileSink();
+
+ bool NewData(const P2PVR::Data &, const Ice::Current &);
+
+ private:
+ bool ReadWaiting() const;
+ bool ReadAvailable() const;
+ bool ReadMuxer(int wait) const;
+ int fds[3];
+ pid_t muxerPid;
+ mutable std::mutex lock;
+};
+
+#endif
+
diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp
index ccb975e..6cd0b5f 100644
--- a/p2pvr/daemon/recorder.cpp
+++ b/p2pvr/daemon/recorder.cpp
@@ -13,8 +13,8 @@ std::string Recorder::extension;
std::string Recorder::muxerCommand;
DECLARE_OPTIONS(Recorder, "P2PVR Recorder options")
-("p2pvr.recorder.extension", Options::value(&extension, "mpg"),
- "File extension to save with (default: avi)")
+("p2pvr.recorder.extension", Options::value(&extension, "mp4"),
+ "File extension to save with (default: mp4)")
("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"),
"Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')")
END_OPTIONS(Recorder);
@@ -65,9 +65,14 @@ Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service,
auto id = storage->CreateForEventRecording(extension, schedule, service, event);
auto store = storage->OpenForWrite(id);
ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); });
- auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand)));
- ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); });
- auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter));
+ auto target = store;
+ P2PVR::RawDataClientPrx muxer;
+ if (!muxerCommand.empty()) {
+ muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand)));
+ target = muxer;
+ }
+ ScopeObject _muxer(NULL, NULL, [this,&muxer]() { if (muxer) adapter->remove(muxer->ice_getIdentity()); });
+ auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, target, adapter->getCommunicator(), adapter));
ss->Start();
Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)",
@@ -91,7 +96,9 @@ Recorder::StopRecording(CurrentPtr c)
std::lock_guard<std::mutex> g(lock);
Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title);
c->stream->Stop();
- adapter->remove(c->muxer->ice_getIdentity());
+ if (c->muxer) {
+ adapter->remove(c->muxer->ice_getIdentity());
+ }
currentRecordings.erase(c);
auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));
storage->Close(c->store);
diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp
index 79280a1..2465afe 100644
--- a/p2pvr/daemon/storage.cpp
+++ b/p2pvr/daemon/storage.cpp
@@ -1,6 +1,7 @@
#include <pch.hpp>
#include "storage.h"
#include "fileSink.h"
+#include "muxedFileSink.h"
#include <commonHelpers.h>
#include <fcntl.h>
#include <logger.h>
@@ -11,6 +12,7 @@
namespace fs = boost::filesystem;
+std::string Storage::muxerCommand;
fs::path Storage::root;
fs::path Storage::byAll;
fs::path Storage::byTitle;
@@ -19,6 +21,8 @@ fs::path Storage::byService;
fs::path Storage::bySchedule;
DECLARE_OPTIONS(Storage, "P2PVR Storage options")
+("p2pvr.storage.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}"),
+ "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}')")
("p2pvr.storage.root", Options::value(&root, "recordings"),
"Root folder in which to store recordings")
("p2pvr.storage.all", Options::value(&byAll, "all"),
@@ -223,13 +227,20 @@ Storage::OpenForWrite(const std::string & id, const Ice::Current & ice)
{
fs::path path = root / byAll / id;
path.replace_extension(FindExtension(id));
- auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE);
- if (fd < 0) {
- Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__,
- path, errno, strerror(errno));
- throw P2PVR::StorageException(path.string(), strerror(errno));
+ P2PVR::RawDataClient * target;
+ if (muxerCommand.empty()) {
+ auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE);
+ if (fd < 0) {
+ Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__,
+ path, errno, strerror(errno));
+ throw P2PVR::StorageException(path.string(), strerror(errno));
+ }
+ target = new FileSink(fd);
+ }
+ else {
+ target = new MuxedFileSink(path, muxerCommand);
}
- auto openFile = OpenFilePtr(new OpenFile(ice.adapter, new FileSink(fd)));
+ auto openFile = OpenFilePtr(new OpenFile(ice.adapter, target));
openFiles.insert(openFile);
return *openFile;
}
diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h
index 86bc76a..89a2836 100644
--- a/p2pvr/daemon/storage.h
+++ b/p2pvr/daemon/storage.h
@@ -29,6 +29,7 @@ class Storage : public P2PVR::Storage {
typedef std::set<OpenFilePtr> OpenFiles;
OpenFiles openFiles;
+ static std::string muxerCommand;
static boost::filesystem::path root;
static boost::filesystem::path byAll;
static boost::filesystem::path byTitle;