diff options
author | randomdan <randomdan@localhost> | 2014-01-22 21:17:43 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2014-01-22 21:17:43 +0000 |
commit | 5c9738977aa14bd6e76922aa724628eb22b71a7d (patch) | |
tree | 19ca8dc374b6b1b4a10f49d5df427afcd095e883 /p2pvr/lib/tuner.cpp | |
parent | Fix up pipe handling in the muxer and write stderr of muxer command into the log (diff) | |
download | p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.bz2 p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.xz p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.zip |
Refactor tuner to use specific implementations of new interface for send different (TS and SI) data to clients
Refactor muxer to not block and read output data whilst writing input data
Add a (service) streamer for testing
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r-- | p2pvr/lib/tuner.cpp | 130 |
1 files changed, 33 insertions, 97 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index bb19a3c..8ebf1a2 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -9,11 +9,11 @@ #include <plugable.h> #include <linux/dvb/frontend.h> #include <linux/dvb/dmx.h> -#include <boost/crc.hpp> #include <boost/tuple/tuple.hpp> #include "fileHandle.h" -#include "siParsers/table.h" #include <cxxabi.h> +#include "tunerSendSi.h" +#include "tunerSendTs.h" class FrontendNotSupported : public NotSupported { public: @@ -177,14 +177,11 @@ uint64_t Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const { Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); - std::vector<Ice::AsyncResultPtr> asyncs; struct pollfd ufd; memset(&ufd, 0, sizeof(pollfd)); ufd.fd = demux; ufd.events = POLLIN | POLLPRI; - uint64_t packetsSent = 0; - bool exitFlag = false; - auto client = _client->ice_collocationOptimized(false); + BackgroundClient client = BackgroundClient(new SendSi(_client)); do { // Wait for data to appear switch (poll(&ufd, 1, DemuxReadTimeout)) { @@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons size_t n = nr; buf.resize(n); - if (!IsValidSection(buf)) { - continue; - } - - asyncs.push_back(client->begin_NewData(buf)); - packetsSent += 1; + client->NewData(buf); time(&lastUsedTime); - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { - if (a->isCompleted()) { - exitFlag = client->end_NewData(a); - return true; - } - return false; - }), asyncs.end()); - } while (!exitFlag); - BOOST_FOREACH(const auto & a, asyncs) { - client->end_NewData(a); - } - Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); + } while (!client->IsFinished()); + auto packetsSent = client->PacketsSent(); + client.reset(); + Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent); return packetsSent; } -bool -Tuner::IsValidSection(const P2PVR::Data & buf) -{ - auto n = buf.size(); - if (n < sizeof(SiTableHeader)) { - Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); - return false; - } - auto * tab = (const SiTableHeader *)(&buf.front()); - size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); - if (n < l) { - Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); - return false; - } - if (n > l) { - Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); - return false; - } - if (!crc32(buf)) { - Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); - return false; - } - return true; -} - int Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) { @@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons std::lock_guard<std::mutex> g(lock); int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), - BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first; + BackgroundClient(new SendSi(client)))).first->first; RequestPID(pid, demux); startSenderThread(); return demux; @@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP ice.con->createProxy(client->ice_getIdentity()); } std::lock_guard<std::mutex> g(lock); - int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first; + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), + BackgroundClient(new SendTs(client)))).first->first; struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); @@ -355,8 +315,6 @@ void Tuner::senderThread() { lock.lock(); - typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall; - std::vector<AsyncCall> asyncs; while (!backgroundClients.empty()) { int n = backgroundClients.rbegin()->first + 1; fd_set rfds; @@ -389,52 +347,22 @@ Tuner::senderThread() } size_t n = nr; buf.resize(n); - if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { - // Send it - asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); - } + c.second->NewData(buf); } } } break; } // Clean up finished async requests - asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) { - time(&lastUsedTime); - try { - if (a.get<1>()->isCompleted()) { - if (a.get<0>()->end_NewData(a.get<1>())) { - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - } - return true; - } - return false; - } - catch (const std::exception & ex) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - catch (...) { - Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__); - close(a.get<2>()); - std::lock_guard<std::mutex> g(lock); - backgroundClients.erase(a.get<2>()); - return true; - } - }), asyncs.end()); lock.lock(); - } - Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__); - BOOST_FOREACH(const auto & a, asyncs) { - try { - a.get<0>()->end_NewData(a.get<1>()); - } - catch (...) { + for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) { + if (client->second->IsFinished()) { + close(client->first); + client = backgroundClients.erase(client); + } + else { + client++; + } } } backgroundThread = NULL; @@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &) return lastUsedTime; } -bool -Tuner::crc32(const P2PVR::Data & buf) +Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) : + _packetsSent(0), + client(c) +{ +} + +Tuner::IDataSender::~IDataSender() +{ +} + +uint64_t +Tuner::IDataSender::PacketsSent() const { - boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; - crc.process_bytes(&buf.front(), buf.size()); - return crc.checksum() == 0; + return _packetsSent; } int Tuner::TuningTimeout; |