diff options
author | randomdan <randomdan@localhost> | 2013-12-05 02:09:09 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2013-12-05 02:09:09 +0000 |
commit | 006f7b7ecf948e53464063fa5405c4da817e0c27 (patch) | |
tree | ead970ef0d4c1377e5a606f727f8ad4bfdaf78f7 /p2pvr/lib/tuner.cpp | |
parent | Tidy maintenance code into separate files (diff) | |
download | p2pvr-006f7b7ecf948e53464063fa5405c4da817e0c27.tar.bz2 p2pvr-006f7b7ecf948e53464063fa5405c4da817e0c27.tar.xz p2pvr-006f7b7ecf948e53464063fa5405c4da817e0c27.zip |
Support background sending of table sections just like streams
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r-- | p2pvr/lib/tuner.cpp | 76 |
1 files changed, 53 insertions, 23 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index b20cc75..25042ba 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -145,6 +145,13 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre ice.con->createProxy(client->ice_getIdentity()); FileHandle demux(OpenDemux()); + RequestPID(pid, demux); + return ReadDemuxAndSend(demux, client); +} + +void +Tuner::RequestPID(int pid, int demux) +{ struct dmx_sct_filter_params sctFilterParams; memset(&sctFilterParams, 0, sizeof(dmx_sct_filter_params)); sctFilterParams.pid = pid; @@ -153,8 +160,6 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre if (ioctl(demux, DMX_SET_FILTER, &sctFilterParams) < 0) { throw P2PVR::DeviceError("demux", strerror(errno), errno); } - - return ReadDemuxAndSend(demux, client); } uint64_t @@ -184,22 +189,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const size_t n = nr; buf.resize(n); - // Verify it - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - continue; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - continue; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + if (!IsValidSection(buf)) { continue; } @@ -222,6 +212,45 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const return packetsSent; } +bool +Tuner::IsValidSection(const P2PVR::Data & buf) +{ + auto n = buf.size(); + if (n < sizeof(SiTableHeader)) { + Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); + return false; + } + auto * tab = (const SiTableHeader *)(&buf.front()); + size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); + if (n < l) { + Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); + return false; + } + if (n > l) { + Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); + return false; + } + if (!crc32(buf)) { + Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + return false; + } + return true; +} + +int +Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) +{ + time(&lastUsedTime); + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + + std::lock_guard<std::mutex> g(lock); + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(client, &Tuner::IsValidSection))).first->first; + RequestPID(pid, demux); + startSenderThread(); + return demux; +} + int Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -270,7 +299,7 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP } void -Tuner::StopSendingTS(int handle, const Ice::Current &) +Tuner::StopSending(int handle, const Ice::Current &) { time(&lastUsedTime); Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); @@ -324,9 +353,10 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - // Send it - asyncs.push_back(AsyncCall(c.second, c.second->begin_NewData(buf), c.first)); - //c.second->NewData(buf); + if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { + // Send it + asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); + } } } } @@ -337,7 +367,7 @@ Tuner::senderThread() time(&lastUsedTime); try { if (a.get<1>()->isCompleted()) { - if (!a.get<0>()->end_NewData(a.get<1>())) { + if (a.get<0>()->end_NewData(a.get<1>())) { close(a.get<2>()); std::lock_guard<std::mutex> g(lock); backgroundClients.erase(a.get<2>()); |