summaryrefslogtreecommitdiff
path: root/p2pvr/lib/tuner.cpp
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-01-22 21:17:43 +0000
committerrandomdan <randomdan@localhost>2014-01-22 21:17:43 +0000
commit5c9738977aa14bd6e76922aa724628eb22b71a7d (patch)
tree19ca8dc374b6b1b4a10f49d5df427afcd095e883 /p2pvr/lib/tuner.cpp
parentFix up pipe handling in the muxer and write stderr of muxer command into the log (diff)
downloadp2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.bz2
p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.xz
p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.zip
Refactor tuner to use specific implementations of new interface for send different (TS and SI) data to clients
Refactor muxer to not block and read output data whilst writing input data Add a (service) streamer for testing
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r--p2pvr/lib/tuner.cpp130
1 files changed, 33 insertions, 97 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp
index bb19a3c..8ebf1a2 100644
--- a/p2pvr/lib/tuner.cpp
+++ b/p2pvr/lib/tuner.cpp
@@ -9,11 +9,11 @@
#include <plugable.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
-#include <boost/crc.hpp>
#include <boost/tuple/tuple.hpp>
#include "fileHandle.h"
-#include "siParsers/table.h"
#include <cxxabi.h>
+#include "tunerSendSi.h"
+#include "tunerSendTs.h"
class FrontendNotSupported : public NotSupported {
public:
@@ -177,14 +177,11 @@ uint64_t
Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const
{
Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__);
- std::vector<Ice::AsyncResultPtr> asyncs;
struct pollfd ufd;
memset(&ufd, 0, sizeof(pollfd));
ufd.fd = demux;
ufd.events = POLLIN | POLLPRI;
- uint64_t packetsSent = 0;
- bool exitFlag = false;
- auto client = _client->ice_collocationOptimized(false);
+ BackgroundClient client = BackgroundClient(new SendSi(_client));
do {
// Wait for data to appear
switch (poll(&ufd, 1, DemuxReadTimeout)) {
@@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons
size_t n = nr;
buf.resize(n);
- if (!IsValidSection(buf)) {
- continue;
- }
-
- asyncs.push_back(client->begin_NewData(buf));
- packetsSent += 1;
+ client->NewData(buf);
time(&lastUsedTime);
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) {
- if (a->isCompleted()) {
- exitFlag = client->end_NewData(a);
- return true;
- }
- return false;
- }), asyncs.end());
- } while (!exitFlag);
- BOOST_FOREACH(const auto & a, asyncs) {
- client->end_NewData(a);
- }
- Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__);
+ } while (!client->IsFinished());
+ auto packetsSent = client->PacketsSent();
+ client.reset();
+ Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent);
return packetsSent;
}
-bool
-Tuner::IsValidSection(const P2PVR::Data & buf)
-{
- auto n = buf.size();
- if (n < sizeof(SiTableHeader)) {
- Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table.");
- return false;
- }
- auto * tab = (const SiTableHeader *)(&buf.front());
- size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length);
- if (n < l) {
- Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length.");
- return false;
- }
- if (n > l) {
- Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length.");
- return false;
- }
- if (!crc32(buf)) {
- Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed).");
- return false;
- }
- return true;
-}
-
int
Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &)
{
@@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons
std::lock_guard<std::mutex> g(lock);
int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
- BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first;
+ BackgroundClient(new SendSi(client)))).first->first;
RequestPID(pid, demux);
startSenderThread();
return demux;
@@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP
ice.con->createProxy(client->ice_getIdentity());
}
std::lock_guard<std::mutex> g(lock);
- int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first;
+ int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
+ BackgroundClient(new SendTs(client)))).first->first;
struct dmx_pes_filter_params pesFilterParams;
memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params));
@@ -355,8 +315,6 @@ void
Tuner::senderThread()
{
lock.lock();
- typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall;
- std::vector<AsyncCall> asyncs;
while (!backgroundClients.empty()) {
int n = backgroundClients.rbegin()->first + 1;
fd_set rfds;
@@ -389,52 +347,22 @@ Tuner::senderThread()
}
size_t n = nr;
buf.resize(n);
- if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) {
- // Send it
- asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first));
- }
+ c.second->NewData(buf);
}
}
}
break;
}
// Clean up finished async requests
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) {
- time(&lastUsedTime);
- try {
- if (a.get<1>()->isCompleted()) {
- if (a.get<0>()->end_NewData(a.get<1>())) {
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- }
- return true;
- }
- return false;
- }
- catch (const std::exception & ex) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- catch (...) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__);
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- }), asyncs.end());
lock.lock();
- }
- Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__);
- BOOST_FOREACH(const auto & a, asyncs) {
- try {
- a.get<0>()->end_NewData(a.get<1>());
- }
- catch (...) {
+ for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) {
+ if (client->second->IsFinished()) {
+ close(client->first);
+ client = backgroundClients.erase(client);
+ }
+ else {
+ client++;
+ }
}
}
backgroundThread = NULL;
@@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &)
return lastUsedTime;
}
-bool
-Tuner::crc32(const P2PVR::Data & buf)
+Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) :
+ _packetsSent(0),
+ client(c)
+{
+}
+
+Tuner::IDataSender::~IDataSender()
+{
+}
+
+uint64_t
+Tuner::IDataSender::PacketsSent() const
{
- boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc;
- crc.process_bytes(&buf.front(), buf.size());
- return crc.checksum() == 0;
+ return _packetsSent;
}
int Tuner::TuningTimeout;