summaryrefslogtreecommitdiff
path: root/p2pvr/lib/tuner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r--p2pvr/lib/tuner.cpp130
1 files changed, 33 insertions, 97 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp
index bb19a3c..8ebf1a2 100644
--- a/p2pvr/lib/tuner.cpp
+++ b/p2pvr/lib/tuner.cpp
@@ -9,11 +9,11 @@
#include <plugable.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
-#include <boost/crc.hpp>
#include <boost/tuple/tuple.hpp>
#include "fileHandle.h"
-#include "siParsers/table.h"
#include <cxxabi.h>
+#include "tunerSendSi.h"
+#include "tunerSendTs.h"
class FrontendNotSupported : public NotSupported {
public:
@@ -177,14 +177,11 @@ uint64_t
Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const
{
Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__);
- std::vector<Ice::AsyncResultPtr> asyncs;
struct pollfd ufd;
memset(&ufd, 0, sizeof(pollfd));
ufd.fd = demux;
ufd.events = POLLIN | POLLPRI;
- uint64_t packetsSent = 0;
- bool exitFlag = false;
- auto client = _client->ice_collocationOptimized(false);
+ BackgroundClient client = BackgroundClient(new SendSi(_client));
do {
// Wait for data to appear
switch (poll(&ufd, 1, DemuxReadTimeout)) {
@@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons
size_t n = nr;
buf.resize(n);
- if (!IsValidSection(buf)) {
- continue;
- }
-
- asyncs.push_back(client->begin_NewData(buf));
- packetsSent += 1;
+ client->NewData(buf);
time(&lastUsedTime);
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) {
- if (a->isCompleted()) {
- exitFlag = client->end_NewData(a);
- return true;
- }
- return false;
- }), asyncs.end());
- } while (!exitFlag);
- BOOST_FOREACH(const auto & a, asyncs) {
- client->end_NewData(a);
- }
- Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__);
+ } while (!client->IsFinished());
+ auto packetsSent = client->PacketsSent();
+ client.reset();
+ Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent);
return packetsSent;
}
-bool
-Tuner::IsValidSection(const P2PVR::Data & buf)
-{
- auto n = buf.size();
- if (n < sizeof(SiTableHeader)) {
- Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table.");
- return false;
- }
- auto * tab = (const SiTableHeader *)(&buf.front());
- size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length);
- if (n < l) {
- Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length.");
- return false;
- }
- if (n > l) {
- Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length.");
- return false;
- }
- if (!crc32(buf)) {
- Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed).");
- return false;
- }
- return true;
-}
-
int
Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &)
{
@@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons
std::lock_guard<std::mutex> g(lock);
int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
- BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first;
+ BackgroundClient(new SendSi(client)))).first->first;
RequestPID(pid, demux);
startSenderThread();
return demux;
@@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP
ice.con->createProxy(client->ice_getIdentity());
}
std::lock_guard<std::mutex> g(lock);
- int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first;
+ int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
+ BackgroundClient(new SendTs(client)))).first->first;
struct dmx_pes_filter_params pesFilterParams;
memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params));
@@ -355,8 +315,6 @@ void
Tuner::senderThread()
{
lock.lock();
- typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall;
- std::vector<AsyncCall> asyncs;
while (!backgroundClients.empty()) {
int n = backgroundClients.rbegin()->first + 1;
fd_set rfds;
@@ -389,52 +347,22 @@ Tuner::senderThread()
}
size_t n = nr;
buf.resize(n);
- if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) {
- // Send it
- asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first));
- }
+ c.second->NewData(buf);
}
}
}
break;
}
// Clean up finished async requests
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) {
- time(&lastUsedTime);
- try {
- if (a.get<1>()->isCompleted()) {
- if (a.get<0>()->end_NewData(a.get<1>())) {
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- }
- return true;
- }
- return false;
- }
- catch (const std::exception & ex) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- catch (...) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__);
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- }), asyncs.end());
lock.lock();
- }
- Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__);
- BOOST_FOREACH(const auto & a, asyncs) {
- try {
- a.get<0>()->end_NewData(a.get<1>());
- }
- catch (...) {
+ for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) {
+ if (client->second->IsFinished()) {
+ close(client->first);
+ client = backgroundClients.erase(client);
+ }
+ else {
+ client++;
+ }
}
}
backgroundThread = NULL;
@@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &)
return lastUsedTime;
}
-bool
-Tuner::crc32(const P2PVR::Data & buf)
+Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) :
+ _packetsSent(0),
+ client(c)
+{
+}
+
+Tuner::IDataSender::~IDataSender()
+{
+}
+
+uint64_t
+Tuner::IDataSender::PacketsSent() const
{
- boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc;
- crc.process_bytes(&buf.front(), buf.size());
- return crc.checksum() == 0;
+ return _packetsSent;
}
int Tuner::TuningTimeout;