diff options
-rw-r--r-- | p2pvr/Jamfile.jam | 3 | ||||
-rw-r--r-- | p2pvr/lib/daemonBase.cpp | 1 | ||||
-rw-r--r-- | p2pvr/lib/muxer.cpp | 20 | ||||
-rw-r--r-- | p2pvr/lib/muxer.h | 3 | ||||
-rw-r--r-- | p2pvr/lib/p2LoggerWrapper.cpp | 1 | ||||
-rw-r--r-- | p2pvr/lib/recorder.cpp | 1 | ||||
-rw-r--r-- | p2pvr/lib/tuner.cpp | 130 | ||||
-rw-r--r-- | p2pvr/lib/tuner.h | 18 | ||||
-rw-r--r-- | p2pvr/lib/tunerSendSi.cpp | 81 | ||||
-rw-r--r-- | p2pvr/lib/tunerSendSi.h | 24 | ||||
-rw-r--r-- | p2pvr/lib/tunerSendTs.cpp | 73 | ||||
-rw-r--r-- | p2pvr/lib/tunerSendTs.h | 22 | ||||
-rw-r--r-- | p2pvr/streamer/Jamfile.jam | 10 | ||||
-rw-r--r-- | p2pvr/streamer/streamer.cpp | 52 |
14 files changed, 329 insertions, 110 deletions
diff --git a/p2pvr/Jamfile.jam b/p2pvr/Jamfile.jam index 8f82f35..53070ac 100644 --- a/p2pvr/Jamfile.jam +++ b/p2pvr/Jamfile.jam @@ -21,10 +21,11 @@ alias p2daemonlib : glibmm : : : <cflags>"-I /usr/include/project2/daemon/lib" <linkflags>"-lp2daemonlib" ; +build-project streamer ; build-project daemon ; build-project carddaemon ; -install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice : <location>./testing ; +install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice streamer//streamer : <location>./testing ; package.install install : : : carddaemon daemon p2//p2pvrp2 ; import type ; diff --git a/p2pvr/lib/daemonBase.cpp b/p2pvr/lib/daemonBase.cpp index ac429ea..1e946d8 100644 --- a/p2pvr/lib/daemonBase.cpp +++ b/p2pvr/lib/daemonBase.cpp @@ -1,3 +1,4 @@ +#include <pch.hpp> #include "daemonBase.h" #include "p2LoggerWrapper.h" #include <logger.h> diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index 25f6b57..16bb410 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -4,6 +4,7 @@ #include <misc.h> #include <poll.h> #include <sys/wait.h> +#include <fcntl.h> #include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/classification.hpp> @@ -13,6 +14,7 @@ Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : std::vector<std::string> params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); muxerPid = popenrwe(params, fds); + fcntl(fds[0], F_SETFL, O_NONBLOCK); Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd); } @@ -30,21 +32,26 @@ Muxer::~Muxer() bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { - if (ReadWaiting()) - return true; - { - std::lock_guard<std::mutex> g(wlock); - if (write(fds[0], &data.front(), data.size()) < (int)data.size()) { + std::lock_guard<std::mutex> g(lock); + for (size_t off = 0; off < data.size(); ) { + // Read output until input wouldn't block + if (ReadWaiting()) + return true; + // Send some input + auto w = write(fds[0], &data[off], data.size() - off); + if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); return true; } + off += w; } + // Read anything that's come out return ReadAvailable(); } bool Muxer::ReadWaiting() const { - std::lock_guard<std::mutex> g(rlock); pollfd fd = { fds[0], POLLOUT, 0 }; while (true) { auto p = poll(&fd, 1, 0); @@ -68,7 +75,6 @@ Muxer::ReadWaiting() const bool Muxer::ReadAvailable() const { - std::lock_guard<std::mutex> g(rlock); return ReadMuxerAndSend(0); } diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index e4789cf..3a492bd 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -18,8 +18,7 @@ class Muxer : public P2PVR::RawDataClient { const P2PVR::RawDataClientPrx target; int fds[3]; pid_t muxerPid; - mutable std::mutex wlock; - mutable std::mutex rlock; + mutable std::mutex lock; }; #endif diff --git a/p2pvr/lib/p2LoggerWrapper.cpp b/p2pvr/lib/p2LoggerWrapper.cpp index 42d4757..60ec191 100644 --- a/p2pvr/lib/p2LoggerWrapper.cpp +++ b/p2pvr/lib/p2LoggerWrapper.cpp @@ -1,3 +1,4 @@ +#include <pch.hpp> #include "p2LoggerWrapper.h" #include "logger.h" diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp index bc13ee2..98c7c4e 100644 --- a/p2pvr/lib/recorder.cpp +++ b/p2pvr/lib/recorder.cpp @@ -1,3 +1,4 @@ +#include <pch.hpp> #include "recorder.h" #include "bindTimerTask.h" #include <boost/bind.hpp> diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index bb19a3c..8ebf1a2 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -9,11 +9,11 @@ #include <plugable.h> #include <linux/dvb/frontend.h> #include <linux/dvb/dmx.h> -#include <boost/crc.hpp> #include <boost/tuple/tuple.hpp> #include "fileHandle.h" -#include "siParsers/table.h" #include <cxxabi.h> +#include "tunerSendSi.h" +#include "tunerSendTs.h" class FrontendNotSupported : public NotSupported { public: @@ -177,14 +177,11 @@ uint64_t Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const { Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); - std::vector<Ice::AsyncResultPtr> asyncs; struct pollfd ufd; memset(&ufd, 0, sizeof(pollfd)); ufd.fd = demux; ufd.events = POLLIN | POLLPRI; - uint64_t packetsSent = 0; - bool exitFlag = false; - auto client = _client->ice_collocationOptimized(false); + BackgroundClient client = BackgroundClient(new SendSi(_client)); do { // Wait for data to appear switch (poll(&ufd, 1, DemuxReadTimeout)) { @@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons size_t n = nr; buf.resize(n); - if (!IsValidSection(buf)) { - continue; - } - - asyncs.push_back(client->begin_NewData(buf)); - packetsSent += 1; + client->NewData(buf); time(&lastUsedTime); - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { - if (a->isCompleted()) { - exitFlag = client->end_NewData(a); - return true; - } - return false; - }), asyncs.end()); - } while (!exitFlag); - BOOST_FOREACH(const auto & a, asyncs) { - client->end_NewData(a); - } - Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); + } while (!client->IsFinished()); + auto packetsSent = client->PacketsSent(); + client.reset(); + Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent); return packetsSent; } -bool -Tuner::IsValidSection(const P2PVR::Data & buf) -{ - auto n = buf.size(); - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - return false; - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - return false; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - return false; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); - return false; - } - return true; -} - int Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons std::lock_guard<std::mutex> g(lock); int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), - BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first; + BackgroundClient(new SendSi(client)))).first->first; RequestPID(pid, demux); startSenderThread(); return demux; @@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP ice.con->createProxy(client->ice_getIdentity()); } std::lock_guard<std::mutex> g(lock); - int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first; + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(new SendTs(client)))).first->first; struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); @@ -355,8 +315,6 @@ void Tuner::senderThread() { lock.lock(); - typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall; - std::vector<AsyncCall> asyncs; while (!backgroundClients.empty()) { int n = backgroundClients.rbegin()->first + 1; fd_set rfds; @@ -389,52 +347,22 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { - // Send it - asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); - } + c.second->NewData(buf); } } } break; } // Clean up finished async requests - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) { - time(&lastUsedTime); - try { - if (a.get<1>()->isCompleted()) { - if (a.get<0>()->end_NewData(a.get<1>())) { - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - } - return true; - } - return false; - } - catch (const std::exception & ex) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - catch (...) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - }), asyncs.end()); lock.lock(); - } - Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__); - BOOST_FOREACH(const auto & a, asyncs) { - try { - a.get<0>()->end_NewData(a.get<1>()); - } - catch (...) { + for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) { + if (client->second->IsFinished()) { + close(client->first); + client = backgroundClients.erase(client); + } + else { + client++; + } } } backgroundThread = NULL; @@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &) return lastUsedTime; } -bool -Tuner::crc32(const P2PVR::Data & buf) +Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) : + _packetsSent(0), + client(c) +{ +} + +Tuner::IDataSender::~IDataSender() +{ +} + +uint64_t +Tuner::IDataSender::PacketsSent() const { - boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; - crc.process_bytes(&buf.front(), buf.size()); - return crc.checksum() == 0; + return _packetsSent; } int Tuner::TuningTimeout; diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 381c804..57bdd82 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -14,6 +14,21 @@ class Tuner : public P2PVR::PrivateTuner { public: + class IDataSender { + public: + IDataSender(const P2PVR::RawDataClientPrx &); + virtual ~IDataSender() = 0; + + virtual void NewData(const P2PVR::Data &) = 0; + virtual bool IsFinished() = 0; + uint64_t PacketsSent() const; + + protected: + uint64_t _packetsSent; + const P2PVR::RawDataClientPrx client; + }; + typedef boost::shared_ptr<IDataSender> BackgroundClient; + Tuner(const boost::filesystem::path & deviceFrontend); ~Tuner(); @@ -38,12 +53,10 @@ class Tuner : public P2PVR::PrivateTuner { INITOPTIONS; private: - static bool crc32(const P2PVR::Data &); int OpenDemux() const; uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; static void RequestPID(int pid, int fd); uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; - static bool IsValidSection(const P2PVR::Data &); void startSenderThread(); void senderThread(); static void setBufferSize(int fd, unsigned long bytes); @@ -51,7 +64,6 @@ class Tuner : public P2PVR::PrivateTuner { const boost::filesystem::path deviceFrontend; const boost::filesystem::path deviceRoot; typedef boost::function<bool(const P2PVR::Data &)> PacketCheckFunction; - typedef boost::tuple<P2PVR::RawDataClientPrx, PacketCheckFunction> BackgroundClient; typedef std::map<int, BackgroundClient> BackgroundClients; BackgroundClients backgroundClients; std::thread * backgroundThread; diff --git a/p2pvr/lib/tunerSendSi.cpp b/p2pvr/lib/tunerSendSi.cpp new file mode 100644 index 0000000..e1f4637 --- /dev/null +++ b/p2pvr/lib/tunerSendSi.cpp @@ -0,0 +1,81 @@ +#include <pch.hpp> +#include "tunerSendSi.h" +#include <logger.h> +#include <boost/crc.hpp> +#include "siParsers/table.h" + +SendSi::SendSi(const P2PVR::RawDataClientPrx & c) : + Tuner::IDataSender(c->ice_collocationOptimized(false)) +{ +} + +SendSi::~SendSi() +{ +} + +void +SendSi::NewData(const P2PVR::Data & buf) +{ + if (!IsValidSection(buf)) { + return; + } + _packetsSent += 1; + asyncs.insert(client->begin_NewData(buf)); +} + +bool +SendSi::IsFinished() +{ + try { + for (auto c = asyncs.begin(); c != asyncs.end(); ) { + if ((*c)->isCompleted()) { + if (client->end_NewData(*c)) { + return true; + } + c = asyncs.erase(c); + } + else { + c++; + } + } + return false; + } + catch (const std::exception & ex) { + Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); + return true; + } +} + +bool +SendSi::IsValidSection(const P2PVR::Data & buf) +{ + auto n = buf.size(); + if (n < sizeof(SiTableHeader)) { + Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); + return false; + } + auto * tab = (const SiTableHeader *)(&buf.front()); + size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); + if (n < l) { + Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); + return false; + } + if (n > l) { + Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); + return false; + } + if (!crc32(buf)) { + Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + return false; + } + return true; +} + +bool +SendSi::crc32(const P2PVR::Data & buf) +{ + boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; + crc.process_bytes(&buf.front(), buf.size()); + return crc.checksum() == 0; +} + diff --git a/p2pvr/lib/tunerSendSi.h b/p2pvr/lib/tunerSendSi.h new file mode 100644 index 0000000..df48f43 --- /dev/null +++ b/p2pvr/lib/tunerSendSi.h @@ -0,0 +1,24 @@ +#ifndef TUNER_SENDSI_H +#define TUNER_SENDSI_H + +#include "tuner.h" + +class SendSi : public Tuner::IDataSender { + public: + SendSi(const P2PVR::RawDataClientPrx &); + ~SendSi(); + + void NewData(const P2PVR::Data &); + bool IsFinished(); + + private: + static bool crc32(const P2PVR::Data &); + static bool IsValidSection(const P2PVR::Data &); + + std::set<Ice::AsyncResultPtr> asyncs; + bool finish; +}; + +#endif + + diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp new file mode 100644 index 0000000..d5ea1bb --- /dev/null +++ b/p2pvr/lib/tunerSendTs.cpp @@ -0,0 +1,73 @@ +#include <pch.hpp> +#include "tunerSendTs.h" +#include <logger.h> + +// ~64kb of TS packets +#define TARGET_BUFFER_SIZE (350 * 188) +// About the ICE message size limit +#define TARGET_BUFFER_LIMIT 512 * 1024 + +SendTs::SendTs(const P2PVR::RawDataClientPrx & c) : + Tuner::IDataSender(c->ice_collocationOptimized(false)) +{ + buffer.reserve(TARGET_BUFFER_SIZE); +} + +SendTs::~SendTs() +{ + if (async) { + client->end_NewData(async); + } + while (!buffer.empty()) { + sendBufferChunk(); + client->end_NewData(async); + } +} + +void +SendTs::NewData(const P2PVR::Data & buf) +{ + buffer.insert(buffer.end(), buf.begin(), buf.end()); + if (!async && buffer.size() >= TARGET_BUFFER_SIZE) { + sendBufferChunk(); + } +} + +void +SendTs::sendBufferChunk() +{ + if (buffer.size() > TARGET_BUFFER_LIMIT) { + auto breakPoint = buffer.begin() + TARGET_BUFFER_LIMIT; + async = client->begin_NewData(P2PVR::Data(buffer.begin(), breakPoint)); + buffer.erase(buffer.begin(), breakPoint); + } + else { + async = client->begin_NewData(buffer); + buffer.clear(); + buffer.reserve(TARGET_BUFFER_SIZE); + } + _packetsSent += 1; +} + +bool +SendTs::IsFinished() +{ + try { + if (async && async->isCompleted()) { + auto finished = client->end_NewData(async); + async = NULL; + if (finished) { + buffer.clear(); + } + return finished; + } + return false; + } + catch (const std::exception & ex) { + async = NULL; + Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); + return true; + } +} + + diff --git a/p2pvr/lib/tunerSendTs.h b/p2pvr/lib/tunerSendTs.h new file mode 100644 index 0000000..ecf32fd --- /dev/null +++ b/p2pvr/lib/tunerSendTs.h @@ -0,0 +1,22 @@ +#ifndef TUNER_SENDTS_H +#define TUNER_SENDTS_H + +#include "tuner.h" + +class SendTs : public Tuner::IDataSender { + public: + SendTs(const P2PVR::RawDataClientPrx &); + ~SendTs(); + + void NewData(const P2PVR::Data &); + bool IsFinished(); + + private: + void sendBufferChunk(); + + Ice::AsyncResultPtr async; + P2PVR::Data buffer; +}; + +#endif + diff --git a/p2pvr/streamer/Jamfile.jam b/p2pvr/streamer/Jamfile.jam new file mode 100644 index 0000000..7f4d2b4 --- /dev/null +++ b/p2pvr/streamer/Jamfile.jam @@ -0,0 +1,10 @@ + + +lib streamer : + [ glob-tree *.cpp ] + : : + <library>../ice//p2pvrice + <library>../lib//p2pvrlib + <library>../util//p2pvrutil + <implicit-dependency>../ice//p2pvrice + ; diff --git a/p2pvr/streamer/streamer.cpp b/p2pvr/streamer/streamer.cpp new file mode 100644 index 0000000..2722266 --- /dev/null +++ b/p2pvr/streamer/streamer.cpp @@ -0,0 +1,52 @@ +#include <daemonBase.h> +#include <p2pvr.h> +#include "globalDevices.h" +#include "si.h" +#include <serviceStreamer.h> +#include <fileSink.h> +#include <muxer.h> + +class P2PvrStreamer : public DaemonBase { + public: + P2PvrStreamer(int argc, char ** argv) : + DaemonBase(argc, argv) + { + } + + void addServants(const Ice::ObjectAdapterPtr & adapter, const IceUtil::TimerPtr &) const + { + adapter->add(new GlobalDevices(), ic->stringToIdentity("GlobalDevices")); + adapter->add(new SI(), ic->stringToIdentity("SI")); + auto output = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new FileSink(1))); + assert(output); + auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(output, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"))); + assert(muxer); + ss = ServiceStreamerPtr(new ServiceStreamer(4287, muxer, ic, adapter)); + assert(ss); + } + + void run() const + { + IceUtil::TimerPtr timer = new IceUtil::Timer(); + Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint); + auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint); + addServants(adapter, timer); + adapter->activate(); + + ss->Start(); + + ic->waitForShutdown(); + timer->destroy(); + } + + void shutdown() const + { + ss->Stop(); + DaemonBase::shutdown(); + } + private: + mutable ServiceStreamerPtr ss; +}; + +DECLARE_GENERIC_LOADER("p2pvrstreamer", DaemonLoader, P2PvrStreamer); + |