diff options
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r-- | p2pvr/lib/tuner.cpp | 130 |
1 files changed, 33 insertions, 97 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index bb19a3c..8ebf1a2 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -9,11 +9,11 @@ #include <plugable.h> #include <linux/dvb/frontend.h> #include <linux/dvb/dmx.h> -#include <boost/crc.hpp> #include <boost/tuple/tuple.hpp> #include "fileHandle.h" -#include "siParsers/table.h" #include <cxxabi.h> +#include "tunerSendSi.h" +#include "tunerSendTs.h" class FrontendNotSupported : public NotSupported { public: @@ -177,14 +177,11 @@ uint64_t Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const { Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); - std::vector<Ice::AsyncResultPtr> asyncs; struct pollfd ufd; memset(&ufd, 0, sizeof(pollfd)); ufd.fd = demux; ufd.events = POLLIN | POLLPRI; - uint64_t packetsSent = 0; - bool exitFlag = false; - auto client = _client->ice_collocationOptimized(false); + BackgroundClient client = BackgroundClient(new SendSi(_client)); do { // Wait for data to appear switch (poll(&ufd, 1, DemuxReadTimeout)) { @@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons size_t n = nr; buf.resize(n); - if (!IsValidSection(buf)) { - continue; - } - - asyncs.push_back(client->begin_NewData(buf)); - packetsSent += 1; + client->NewData(buf); time(&lastUsedTime); - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { - if (a->isCompleted()) { - exitFlag = client->end_NewData(a); - return true; - } - return false; - }), asyncs.end()); - } while (!exitFlag); - BOOST_FOREACH(const auto & a, asyncs) { - client->end_NewData(a); - } - Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); + } while (!client->IsFinished()); + auto packetsSent = client->PacketsSent(); + client.reset(); + Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent); return packetsSent; } -bool -Tuner::IsValidSection(const P2PVR::Data & buf) -{ - auto n = buf.size(); - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - return false; - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - return false; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - return false; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); - return false; - } - return true; -} - int Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons std::lock_guard<std::mutex> g(lock); int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), - BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first; + BackgroundClient(new SendSi(client)))).first->first; RequestPID(pid, demux); startSenderThread(); return demux; @@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP ice.con->createProxy(client->ice_getIdentity()); } std::lock_guard<std::mutex> g(lock); - int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first; + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(new SendTs(client)))).first->first; struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); @@ -355,8 +315,6 @@ void Tuner::senderThread() { lock.lock(); - typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall; - std::vector<AsyncCall> asyncs; while (!backgroundClients.empty()) { int n = backgroundClients.rbegin()->first + 1; fd_set rfds; @@ -389,52 +347,22 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { - // Send it - asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); - } + c.second->NewData(buf); } } } break; } // Clean up finished async requests - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) { - time(&lastUsedTime); - try { - if (a.get<1>()->isCompleted()) { - if (a.get<0>()->end_NewData(a.get<1>())) { - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - } - return true; - } - return false; - } - catch (const std::exception & ex) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - catch (...) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - }), asyncs.end()); lock.lock(); - } - Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__); - BOOST_FOREACH(const auto & a, asyncs) { - try { - a.get<0>()->end_NewData(a.get<1>()); - } - catch (...) { + for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) { + if (client->second->IsFinished()) { + close(client->first); + client = backgroundClients.erase(client); + } + else { + client++; + } } } backgroundThread = NULL; @@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &) return lastUsedTime; } -bool -Tuner::crc32(const P2PVR::Data & buf) +Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) : + _packetsSent(0), + client(c) +{ +} + +Tuner::IDataSender::~IDataSender() +{ +} + +uint64_t +Tuner::IDataSender::PacketsSent() const { - boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; - crc.process_bytes(&buf.front(), buf.size()); - return crc.checksum() == 0; + return _packetsSent; } int Tuner::TuningTimeout; |