From 5c9738977aa14bd6e76922aa724628eb22b71a7d Mon Sep 17 00:00:00 2001 From: randomdan Date: Wed, 22 Jan 2014 21:17:43 +0000 Subject: Refactor tuner to use specific implementations of new interface for send different (TS and SI) data to clients Refactor muxer to not block and read output data whilst writing input data Add a (service) streamer for testing --- p2pvr/Jamfile.jam | 3 +- p2pvr/lib/daemonBase.cpp | 1 + p2pvr/lib/muxer.cpp | 20 ++++--- p2pvr/lib/muxer.h | 3 +- p2pvr/lib/p2LoggerWrapper.cpp | 1 + p2pvr/lib/recorder.cpp | 1 + p2pvr/lib/tuner.cpp | 130 +++++++++++------------------------------- p2pvr/lib/tuner.h | 18 +++++- p2pvr/lib/tunerSendSi.cpp | 81 ++++++++++++++++++++++++++ p2pvr/lib/tunerSendSi.h | 24 ++++++++ p2pvr/lib/tunerSendTs.cpp | 73 ++++++++++++++++++++++++ p2pvr/lib/tunerSendTs.h | 22 +++++++ p2pvr/streamer/Jamfile.jam | 10 ++++ p2pvr/streamer/streamer.cpp | 52 +++++++++++++++++ 14 files changed, 329 insertions(+), 110 deletions(-) create mode 100644 p2pvr/lib/tunerSendSi.cpp create mode 100644 p2pvr/lib/tunerSendSi.h create mode 100644 p2pvr/lib/tunerSendTs.cpp create mode 100644 p2pvr/lib/tunerSendTs.h create mode 100644 p2pvr/streamer/Jamfile.jam create mode 100644 p2pvr/streamer/streamer.cpp diff --git a/p2pvr/Jamfile.jam b/p2pvr/Jamfile.jam index 8f82f35..53070ac 100644 --- a/p2pvr/Jamfile.jam +++ b/p2pvr/Jamfile.jam @@ -21,10 +21,11 @@ alias p2daemonlib : glibmm : : : "-I /usr/include/project2/daemon/lib" "-lp2daemonlib" ; +build-project streamer ; build-project daemon ; build-project carddaemon ; -install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice : ./testing ; +install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice streamer//streamer : ./testing ; package.install install : : : carddaemon daemon p2//p2pvrp2 ; import type ; diff --git a/p2pvr/lib/daemonBase.cpp b/p2pvr/lib/daemonBase.cpp index ac429ea..1e946d8 100644 --- a/p2pvr/lib/daemonBase.cpp +++ b/p2pvr/lib/daemonBase.cpp @@ -1,3 +1,4 @@ +#include #include "daemonBase.h" #include "p2LoggerWrapper.h" #include diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index 25f6b57..16bb410 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -13,6 +14,7 @@ Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : std::vector params; boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); muxerPid = popenrwe(params, fds); + fcntl(fds[0], F_SETFL, O_NONBLOCK); Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd); } @@ -30,21 +32,26 @@ Muxer::~Muxer() bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { - if (ReadWaiting()) - return true; - { - std::lock_guard g(wlock); - if (write(fds[0], &data.front(), data.size()) < (int)data.size()) { + std::lock_guard g(lock); + for (size_t off = 0; off < data.size(); ) { + // Read output until input wouldn't block + if (ReadWaiting()) + return true; + // Send some input + auto w = write(fds[0], &data[off], data.size() - off); + if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); return true; } + off += w; } + // Read anything that's come out return ReadAvailable(); } bool Muxer::ReadWaiting() const { - std::lock_guard g(rlock); pollfd fd = { fds[0], POLLOUT, 0 }; while (true) { auto p = poll(&fd, 1, 0); @@ -68,7 +75,6 @@ Muxer::ReadWaiting() const bool Muxer::ReadAvailable() const { - std::lock_guard g(rlock); return ReadMuxerAndSend(0); } diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index e4789cf..3a492bd 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -18,8 +18,7 @@ class Muxer : public P2PVR::RawDataClient { const P2PVR::RawDataClientPrx target; int fds[3]; pid_t muxerPid; - mutable std::mutex wlock; - mutable std::mutex rlock; + mutable std::mutex lock; }; #endif diff --git a/p2pvr/lib/p2LoggerWrapper.cpp b/p2pvr/lib/p2LoggerWrapper.cpp index 42d4757..60ec191 100644 --- a/p2pvr/lib/p2LoggerWrapper.cpp +++ b/p2pvr/lib/p2LoggerWrapper.cpp @@ -1,3 +1,4 @@ +#include #include "p2LoggerWrapper.h" #include "logger.h" diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp index bc13ee2..98c7c4e 100644 --- a/p2pvr/lib/recorder.cpp +++ b/p2pvr/lib/recorder.cpp @@ -1,3 +1,4 @@ +#include #include "recorder.h" #include "bindTimerTask.h" #include diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index bb19a3c..8ebf1a2 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -9,11 +9,11 @@ #include #include #include -#include #include #include "fileHandle.h" -#include "siParsers/table.h" #include +#include "tunerSendSi.h" +#include "tunerSendTs.h" class FrontendNotSupported : public NotSupported { public: @@ -177,14 +177,11 @@ uint64_t Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const { Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); - std::vector asyncs; struct pollfd ufd; memset(&ufd, 0, sizeof(pollfd)); ufd.fd = demux; ufd.events = POLLIN | POLLPRI; - uint64_t packetsSent = 0; - bool exitFlag = false; - auto client = _client->ice_collocationOptimized(false); + BackgroundClient client = BackgroundClient(new SendSi(_client)); do { // Wait for data to appear switch (poll(&ufd, 1, DemuxReadTimeout)) { @@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons size_t n = nr; buf.resize(n); - if (!IsValidSection(buf)) { - continue; - } - - asyncs.push_back(client->begin_NewData(buf)); - packetsSent += 1; + client->NewData(buf); time(&lastUsedTime); - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { - if (a->isCompleted()) { - exitFlag = client->end_NewData(a); - return true; - } - return false; - }), asyncs.end()); - } while (!exitFlag); - BOOST_FOREACH(const auto & a, asyncs) { - client->end_NewData(a); - } - Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); + } while (!client->IsFinished()); + auto packetsSent = client->PacketsSent(); + client.reset(); + Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent); return packetsSent; } -bool -Tuner::IsValidSection(const P2PVR::Data & buf) -{ - auto n = buf.size(); - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - return false; - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - return false; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - return false; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); - return false; - } - return true; -} - int Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons std::lock_guard g(lock); int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), - BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first; + BackgroundClient(new SendSi(client)))).first->first; RequestPID(pid, demux); startSenderThread(); return demux; @@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP ice.con->createProxy(client->ice_getIdentity()); } std::lock_guard g(lock); - int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first; + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(new SendTs(client)))).first->first; struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); @@ -355,8 +315,6 @@ void Tuner::senderThread() { lock.lock(); - typedef boost::tuple AsyncCall; - std::vector asyncs; while (!backgroundClients.empty()) { int n = backgroundClients.rbegin()->first + 1; fd_set rfds; @@ -389,52 +347,22 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { - // Send it - asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); - } + c.second->NewData(buf); } } } break; } // Clean up finished async requests - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) { - time(&lastUsedTime); - try { - if (a.get<1>()->isCompleted()) { - if (a.get<0>()->end_NewData(a.get<1>())) { - close(a.get<2>()); - std::lock_guard g(lock); - backgroundClients.erase(a.get<2>()); - } - return true; - } - return false; - } - catch (const std::exception & ex) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); - close(a.get<2>()); - std::lock_guard g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - catch (...) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__); - close(a.get<2>()); - std::lock_guard g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - }), asyncs.end()); lock.lock(); - } - Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__); - BOOST_FOREACH(const auto & a, asyncs) { - try { - a.get<0>()->end_NewData(a.get<1>()); - } - catch (...) { + for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) { + if (client->second->IsFinished()) { + close(client->first); + client = backgroundClients.erase(client); + } + else { + client++; + } } } backgroundThread = NULL; @@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &) return lastUsedTime; } -bool -Tuner::crc32(const P2PVR::Data & buf) +Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) : + _packetsSent(0), + client(c) +{ +} + +Tuner::IDataSender::~IDataSender() +{ +} + +uint64_t +Tuner::IDataSender::PacketsSent() const { - boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; - crc.process_bytes(&buf.front(), buf.size()); - return crc.checksum() == 0; + return _packetsSent; } int Tuner::TuningTimeout; diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 381c804..57bdd82 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -14,6 +14,21 @@ class Tuner : public P2PVR::PrivateTuner { public: + class IDataSender { + public: + IDataSender(const P2PVR::RawDataClientPrx &); + virtual ~IDataSender() = 0; + + virtual void NewData(const P2PVR::Data &) = 0; + virtual bool IsFinished() = 0; + uint64_t PacketsSent() const; + + protected: + uint64_t _packetsSent; + const P2PVR::RawDataClientPrx client; + }; + typedef boost::shared_ptr BackgroundClient; + Tuner(const boost::filesystem::path & deviceFrontend); ~Tuner(); @@ -38,12 +53,10 @@ class Tuner : public P2PVR::PrivateTuner { INITOPTIONS; private: - static bool crc32(const P2PVR::Data &); int OpenDemux() const; uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; static void RequestPID(int pid, int fd); uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; - static bool IsValidSection(const P2PVR::Data &); void startSenderThread(); void senderThread(); static void setBufferSize(int fd, unsigned long bytes); @@ -51,7 +64,6 @@ class Tuner : public P2PVR::PrivateTuner { const boost::filesystem::path deviceFrontend; const boost::filesystem::path deviceRoot; typedef boost::function PacketCheckFunction; - typedef boost::tuple BackgroundClient; typedef std::map BackgroundClients; BackgroundClients backgroundClients; std::thread * backgroundThread; diff --git a/p2pvr/lib/tunerSendSi.cpp b/p2pvr/lib/tunerSendSi.cpp new file mode 100644 index 0000000..e1f4637 --- /dev/null +++ b/p2pvr/lib/tunerSendSi.cpp @@ -0,0 +1,81 @@ +#include +#include "tunerSendSi.h" +#include +#include +#include "siParsers/table.h" + +SendSi::SendSi(const P2PVR::RawDataClientPrx & c) : + Tuner::IDataSender(c->ice_collocationOptimized(false)) +{ +} + +SendSi::~SendSi() +{ +} + +void +SendSi::NewData(const P2PVR::Data & buf) +{ + if (!IsValidSection(buf)) { + return; + } + _packetsSent += 1; + asyncs.insert(client->begin_NewData(buf)); +} + +bool +SendSi::IsFinished() +{ + try { + for (auto c = asyncs.begin(); c != asyncs.end(); ) { + if ((*c)->isCompleted()) { + if (client->end_NewData(*c)) { + return true; + } + c = asyncs.erase(c); + } + else { + c++; + } + } + return false; + } + catch (const std::exception & ex) { + Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); + return true; + } +} + +bool +SendSi::IsValidSection(const P2PVR::Data & buf) +{ + auto n = buf.size(); + if (n < sizeof(SiTableHeader)) { + Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); + return false; + } + auto * tab = (const SiTableHeader *)(&buf.front()); + size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); + if (n < l) { + Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); + return false; + } + if (n > l) { + Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); + return false; + } + if (!crc32(buf)) { + Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + return false; + } + return true; +} + +bool +SendSi::crc32(const P2PVR::Data & buf) +{ + boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; + crc.process_bytes(&buf.front(), buf.size()); + return crc.checksum() == 0; +} + diff --git a/p2pvr/lib/tunerSendSi.h b/p2pvr/lib/tunerSendSi.h new file mode 100644 index 0000000..df48f43 --- /dev/null +++ b/p2pvr/lib/tunerSendSi.h @@ -0,0 +1,24 @@ +#ifndef TUNER_SENDSI_H +#define TUNER_SENDSI_H + +#include "tuner.h" + +class SendSi : public Tuner::IDataSender { + public: + SendSi(const P2PVR::RawDataClientPrx &); + ~SendSi(); + + void NewData(const P2PVR::Data &); + bool IsFinished(); + + private: + static bool crc32(const P2PVR::Data &); + static bool IsValidSection(const P2PVR::Data &); + + std::set asyncs; + bool finish; +}; + +#endif + + diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp new file mode 100644 index 0000000..d5ea1bb --- /dev/null +++ b/p2pvr/lib/tunerSendTs.cpp @@ -0,0 +1,73 @@ +#include +#include "tunerSendTs.h" +#include + +// ~64kb of TS packets +#define TARGET_BUFFER_SIZE (350 * 188) +// About the ICE message size limit +#define TARGET_BUFFER_LIMIT 512 * 1024 + +SendTs::SendTs(const P2PVR::RawDataClientPrx & c) : + Tuner::IDataSender(c->ice_collocationOptimized(false)) +{ + buffer.reserve(TARGET_BUFFER_SIZE); +} + +SendTs::~SendTs() +{ + if (async) { + client->end_NewData(async); + } + while (!buffer.empty()) { + sendBufferChunk(); + client->end_NewData(async); + } +} + +void +SendTs::NewData(const P2PVR::Data & buf) +{ + buffer.insert(buffer.end(), buf.begin(), buf.end()); + if (!async && buffer.size() >= TARGET_BUFFER_SIZE) { + sendBufferChunk(); + } +} + +void +SendTs::sendBufferChunk() +{ + if (buffer.size() > TARGET_BUFFER_LIMIT) { + auto breakPoint = buffer.begin() + TARGET_BUFFER_LIMIT; + async = client->begin_NewData(P2PVR::Data(buffer.begin(), breakPoint)); + buffer.erase(buffer.begin(), breakPoint); + } + else { + async = client->begin_NewData(buffer); + buffer.clear(); + buffer.reserve(TARGET_BUFFER_SIZE); + } + _packetsSent += 1; +} + +bool +SendTs::IsFinished() +{ + try { + if (async && async->isCompleted()) { + auto finished = client->end_NewData(async); + async = NULL; + if (finished) { + buffer.clear(); + } + return finished; + } + return false; + } + catch (const std::exception & ex) { + async = NULL; + Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); + return true; + } +} + + diff --git a/p2pvr/lib/tunerSendTs.h b/p2pvr/lib/tunerSendTs.h new file mode 100644 index 0000000..ecf32fd --- /dev/null +++ b/p2pvr/lib/tunerSendTs.h @@ -0,0 +1,22 @@ +#ifndef TUNER_SENDTS_H +#define TUNER_SENDTS_H + +#include "tuner.h" + +class SendTs : public Tuner::IDataSender { + public: + SendTs(const P2PVR::RawDataClientPrx &); + ~SendTs(); + + void NewData(const P2PVR::Data &); + bool IsFinished(); + + private: + void sendBufferChunk(); + + Ice::AsyncResultPtr async; + P2PVR::Data buffer; +}; + +#endif + diff --git a/p2pvr/streamer/Jamfile.jam b/p2pvr/streamer/Jamfile.jam new file mode 100644 index 0000000..7f4d2b4 --- /dev/null +++ b/p2pvr/streamer/Jamfile.jam @@ -0,0 +1,10 @@ + + +lib streamer : + [ glob-tree *.cpp ] + : : + ../ice//p2pvrice + ../lib//p2pvrlib + ../util//p2pvrutil + ../ice//p2pvrice + ; diff --git a/p2pvr/streamer/streamer.cpp b/p2pvr/streamer/streamer.cpp new file mode 100644 index 0000000..2722266 --- /dev/null +++ b/p2pvr/streamer/streamer.cpp @@ -0,0 +1,52 @@ +#include +#include +#include "globalDevices.h" +#include "si.h" +#include +#include +#include + +class P2PvrStreamer : public DaemonBase { + public: + P2PvrStreamer(int argc, char ** argv) : + DaemonBase(argc, argv) + { + } + + void addServants(const Ice::ObjectAdapterPtr & adapter, const IceUtil::TimerPtr &) const + { + adapter->add(new GlobalDevices(), ic->stringToIdentity("GlobalDevices")); + adapter->add(new SI(), ic->stringToIdentity("SI")); + auto output = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new FileSink(1))); + assert(output); + auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(output, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"))); + assert(muxer); + ss = ServiceStreamerPtr(new ServiceStreamer(4287, muxer, ic, adapter)); + assert(ss); + } + + void run() const + { + IceUtil::TimerPtr timer = new IceUtil::Timer(); + Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint); + auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint); + addServants(adapter, timer); + adapter->activate(); + + ss->Start(); + + ic->waitForShutdown(); + timer->destroy(); + } + + void shutdown() const + { + ss->Stop(); + DaemonBase::shutdown(); + } + private: + mutable ServiceStreamerPtr ss; +}; + +DECLARE_GENERIC_LOADER("p2pvrstreamer", DaemonLoader, P2PvrStreamer); + -- cgit v1.2.3