From 006f7b7ecf948e53464063fa5405c4da817e0c27 Mon Sep 17 00:00:00 2001 From: randomdan Date: Thu, 5 Dec 2013 02:09:09 +0000 Subject: Support background sending of table sections just like streams --- p2pvr/ice/p2pvr.ice | 3 ++- p2pvr/lib/tuner.cpp | 76 +++++++++++++++++++++++++++++++++++++---------------- p2pvr/lib/tuner.h | 11 ++++++-- 3 files changed, 64 insertions(+), 26 deletions(-) diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index 9dac2a4..7d84955 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -210,7 +210,8 @@ module P2PVR { idempotent void SendEventInformation(RawDataClient * client); int StartSendingTS(PacketIds pids, RawDataClient * client); - idempotent void StopSendingTS(int handle); + int StartSendingSection(int pid, RawDataClient * client); + idempotent void StopSending(int handle); }; interface PrivateTuner extends Tuner { idempotent void TuneTo(DVBSI::Delivery d); diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index b20cc75..25042ba 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -145,6 +145,13 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre ice.con->createProxy(client->ice_getIdentity()); FileHandle demux(OpenDemux()); + RequestPID(pid, demux); + return ReadDemuxAndSend(demux, client); +} + +void +Tuner::RequestPID(int pid, int demux) +{ struct dmx_sct_filter_params sctFilterParams; memset(&sctFilterParams, 0, sizeof(dmx_sct_filter_params)); sctFilterParams.pid = pid; @@ -153,8 +160,6 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre if (ioctl(demux, DMX_SET_FILTER, &sctFilterParams) < 0) { throw P2PVR::DeviceError("demux", strerror(errno), errno); } - - return ReadDemuxAndSend(demux, client); } uint64_t @@ -184,22 +189,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const size_t n = nr; buf.resize(n); - // Verify it - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - continue; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - continue; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + if (!IsValidSection(buf)) { continue; } @@ -222,6 +212,45 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const return packetsSent; } +bool +Tuner::IsValidSection(const P2PVR::Data & buf) +{ + auto n = buf.size(); + if (n < sizeof(SiTableHeader)) { + Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); + return false; + } + auto * tab = (const SiTableHeader *)(&buf.front()); + size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); + if (n < l) { + Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); + return false; + } + if (n > l) { + Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); + return false; + } + if (!crc32(buf)) { + Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + return false; + } + return true; +} + +int +Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) +{ + time(&lastUsedTime); + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + + std::lock_guard g(lock); + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(client, &Tuner::IsValidSection))).first->first; + RequestPID(pid, demux); + startSenderThread(); + return demux; +} + int Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -270,7 +299,7 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP } void -Tuner::StopSendingTS(int handle, const Ice::Current &) +Tuner::StopSending(int handle, const Ice::Current &) { time(&lastUsedTime); Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); @@ -324,9 +353,10 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - // Send it - asyncs.push_back(AsyncCall(c.second, c.second->begin_NewData(buf), c.first)); - //c.second->NewData(buf); + if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { + // Send it + asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); + } } } } @@ -337,7 +367,7 @@ Tuner::senderThread() time(&lastUsedTime); try { if (a.get<1>()->isCompleted()) { - if (!a.get<0>()->end_NewData(a.get<1>())) { + if (a.get<0>()->end_NewData(a.get<1>())) { close(a.get<2>()); std::lock_guard g(lock); backgroundClients.erase(a.get<2>()); diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 08701eb..e6cfa80 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -8,6 +8,8 @@ #include #include #include +#include +#include class Tuner : public P2PVR::PrivateTuner { public: @@ -27,7 +29,8 @@ class Tuner : public P2PVR::PrivateTuner { void SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current&); int StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &); - void StopSendingTS(int handle, const Ice::Current &); + int StartSendingSection(Ice::Int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &); + void StopSending(int handle, const Ice::Current &); Ice::Long GetLastUsedTime(const Ice::Current&); @@ -35,14 +38,18 @@ class Tuner : public P2PVR::PrivateTuner { static bool crc32(const P2PVR::Data &); int OpenDemux() const; uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; + static void RequestPID(int pid, int fd); uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; + static bool IsValidSection(const P2PVR::Data &); void startSenderThread(); void senderThread(); const boost::filesystem::path deviceFrontend; const boost::filesystem::path deviceRoot; const int timeout; - typedef std::map BackgroundClients; + typedef boost::function PacketCheckFunction; + typedef boost::tuple BackgroundClient; + typedef std::map BackgroundClients; BackgroundClients backgroundClients; std::thread * backgroundThread; std::mutex lock; -- cgit v1.2.3