summaryrefslogtreecommitdiff
path: root/p2pvr/lib/tuner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r--p2pvr/lib/tuner.cpp355
1 files changed, 355 insertions, 0 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp
new file mode 100644
index 0000000..e508152
--- /dev/null
+++ b/p2pvr/lib/tuner.cpp
@@ -0,0 +1,355 @@
+#include "tuner.h"
+#include <fcntl.h>
+#include <Ice/Ice.h>
+#include <sys/ioctl.h>
+#include <poll.h>
+#include <logger.h>
+#include <misc.h>
+#include <plugable.h>
+#include <linux/dvb/frontend.h>
+#include <linux/dvb/dmx.h>
+#include <boost/crc.hpp>
+#include <boost/tuple/tuple.hpp>
+#include "fileHandle.h"
+#include "siParsers/table.h"
+
+class FrontendNotSupported : public NotSupported {
+ public:
+ FrontendNotSupported(fe_type t) : NotSupported(stringbf("Frontend not supported: %s", t))
+ { }
+};
+
+Tuner::Tuner(const boost::filesystem::path & df) :
+ deviceFrontend(df),
+ deviceRoot(df.branch_path()),
+ timeout(20000),
+ backgroundThread(NULL)
+{
+ int fd = open(deviceFrontend.string().c_str(), O_RDWR);
+ if (fd < 0) {
+ throw P2PVR::DeviceError(deviceFrontend.string(), strerror(errno), errno);
+ }
+ try {
+ struct dvb_frontend_info fe_info;
+ if (ioctl(fd, FE_GET_INFO, &fe_info) < 0) {
+ throw P2PVR::DeviceError(deviceFrontend.string(), strerror(errno), errno);
+ }
+ frontend = FrontendPtr(FrontendLoader::createNew<FrontendNotSupported>(fe_info.type, this, fd, fe_info));
+ }
+ catch (...) {
+ close(fd);
+ throw;
+ }
+ Logger()->messagebf(LOG_INFO, "%s: Attached to %s (%s, type %s)", __PRETTY_FUNCTION__,
+ deviceRoot, frontend->Info().name, frontend->Type());
+}
+
+Tuner::~Tuner()
+{
+ while (!backgroundClients.empty()) {
+ close(backgroundClients.begin()->first);
+ backgroundClients.erase(backgroundClients.begin());
+ }
+ if (backgroundThread) {
+ backgroundThread->join();
+ delete backgroundThread;
+ }
+}
+
+void
+Tuner::TuneTo(const DVBSI::DeliveryPtr & mp, const Ice::Current&)
+{
+ frontend->TuneTo(mp);
+}
+
+int
+Tuner::GetStatus(const Ice::Current &)
+{
+ return frontend->GetStatus();
+}
+
+std::string
+Tuner::Device() const
+{
+ return deviceRoot.string();
+}
+
+int
+Tuner::OpenDemux() const
+{
+ int demux = open((deviceRoot / "demux0").string().c_str(), O_RDWR | O_NONBLOCK);
+ if (demux < 0) {
+ throw P2PVR::DeviceError(deviceRoot.string(), strerror(errno), errno);
+ }
+ return demux;
+}
+
+void
+Tuner::ScanAndSendNetworkInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice)
+{
+ frontend->FrequencyScan([this, &client, &ice](long) {
+ try {
+ SendNetworkInformation(client, ice);
+ return true;
+ }
+ catch (...) {
+ return false;
+ }
+ });
+}
+
+void
+Tuner::SendNetworkInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice)
+{
+ SendPID(0x10, client, ice);
+}
+
+void
+Tuner::SendBouquetAssociations(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice)
+{
+ SendPID(0x11, client, ice);
+}
+
+void
+Tuner::SendServiceDescriptions(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice)
+{
+ SendPID(0x11, client, ice);
+}
+
+void
+Tuner::SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice)
+{
+ SendPID(0x12, client, ice);
+}
+
+void
+Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) const
+{
+ Logger()->messagebf(LOG_DEBUG, "%s: pid = 0x%x", __PRETTY_FUNCTION__, pid);
+
+ ice.con->createProxy(client->ice_getIdentity());
+ FileHandle demux(OpenDemux());
+ struct dmx_sct_filter_params sctFilterParams;
+ memset(&sctFilterParams, 0, sizeof(dmx_sct_filter_params));
+ sctFilterParams.pid = pid;
+ sctFilterParams.flags = DMX_IMMEDIATE_START;
+
+ if (ioctl(demux, DMX_SET_FILTER, &sctFilterParams) < 0) {
+ throw P2PVR::DeviceError("demux", strerror(errno), errno);
+ }
+
+ ReadDemuxAndSend(demux, client);
+}
+
+void
+Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const
+{
+ Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__);
+ std::vector<Ice::AsyncResultPtr> asyncs;
+ struct pollfd ufd;
+ bool exitFlag = false;
+ do {
+ // Wait for data to appear
+ memset(&ufd, 0, sizeof(pollfd));
+ ufd.fd = demux;
+ ufd.events = POLLIN;
+ if (poll(&ufd, 1, timeout) < 1) {
+ Logger()->messagebf(LOG_DEBUG, "%s: Timed out waiting for data", __PRETTY_FUNCTION__);
+ throw P2PVR::DeviceError("demux", "Timed out. Tuned to a multiplex?", 0);
+ }
+
+ // Read it
+ P2PVR::Data buf(1 << 12);
+ int nr = read(demux, &buf.front(), buf.size());
+ if (nr < 0) {
+ throw P2PVR::DeviceError("demux", strerror(errno), errno);
+ }
+ size_t n = nr;
+ buf.resize(n);
+
+ // Verify it
+ if (n < sizeof(SiTableHeader)) {
+ Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table.");
+ }
+ auto * tab = (const SiTableHeader *)(&buf.front());
+ size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length);
+ if (n < l) {
+ Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length.");
+ continue;
+ }
+ if (n > l) {
+ Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length.");
+ continue;
+ }
+ if (!crc32(buf)) {
+ Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed).");
+ continue;
+ }
+
+ asyncs.push_back(client->begin_NewData(buf));
+
+ asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) {
+ if (a->isCompleted()) {
+ exitFlag = client->end_NewData(a);
+ return true;
+ }
+ return false;
+ }), asyncs.end());
+ } while (!exitFlag);
+ BOOST_FOREACH(const auto & a, asyncs) {
+ client->end_NewData(a);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__);
+}
+
+int
+Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &)
+{
+ Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__);
+ if (pids.empty()) {
+ throw P2PVR::DeviceError("demux", "Packet Id list cannot be empty", 0);
+ }
+
+ std::lock_guard<std::mutex> g(lock);
+ int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client)).first->first;
+
+ struct dmx_pes_filter_params pesFilterParams;
+ memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params));
+ Logger()->messagebf(LOG_ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]);
+ pesFilterParams.pid = pids[0];
+ pesFilterParams.input = DMX_IN_FRONTEND;
+ pesFilterParams.output = DMX_OUT_TSDEMUX_TAP;
+ pesFilterParams.pes_type = DMX_PES_OTHER;
+ pesFilterParams.flags = 0;
+
+ if (ioctl(demux, DMX_SET_PES_FILTER, &pesFilterParams) < 0) {
+ backgroundClients.erase(demux);
+ Logger()->messagebf(LOG_ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ throw P2PVR::DeviceError("demux", strerror(errno), errno);
+ }
+
+ for (unsigned int x = 1; x < pids.size(); x += 1) {
+ __u16 p = pids[x];
+ Logger()->messagebf(LOG_ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p);
+ if (ioctl(demux, DMX_ADD_PID, &p) < 0) {
+ backgroundClients.erase(demux);
+ Logger()->messagebf(LOG_ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ throw P2PVR::DeviceError("demux", strerror(errno), errno);
+ }
+ }
+
+ if (ioctl(demux, DMX_START) < 0) {
+ backgroundClients.erase(demux);
+ Logger()->messagebf(LOG_ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ throw P2PVR::DeviceError("demux", strerror(errno), errno);
+ }
+
+ startSenderThread();
+ return demux;
+}
+
+void
+Tuner::StopSendingTS(int handle, const Ice::Current &)
+{
+ Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__);
+ std::lock_guard<std::mutex> g(lock);
+ if (backgroundClients.find(handle) != backgroundClients.end()) {
+ close(handle);
+ backgroundClients.erase(handle);
+ }
+}
+
+void
+Tuner::startSenderThread()
+{
+ if (!backgroundThread) {
+ backgroundThread = new std::thread(&Tuner::senderThread, this);
+ }
+}
+
+void
+Tuner::senderThread()
+{
+ lock.lock();
+ typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall;
+ std::vector<AsyncCall> asyncs;
+ while (!backgroundClients.empty()) {
+ int n = backgroundClients.rbegin()->first + 1;
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ BOOST_FOREACH(const auto & c, backgroundClients) {
+ FD_SET(c.first, &rfds);
+ }
+ lock.unlock();
+
+ struct timeval tv { 2, 0 };
+ switch (select(n, &rfds, NULL, NULL, &tv)) {
+ case -1: // error
+ case 0: // nothing to read, but all is well
+ break;
+ default:
+ { // stuff to do
+ std::lock_guard<std::mutex> g(lock);
+ BOOST_FOREACH(const auto & c, backgroundClients) {
+ if (FD_ISSET(c.first, &rfds)) {
+ // Read it
+ P2PVR::Data buf(1 << 16);
+ int nr = read(c.first, &buf.front(), buf.size());
+ if (nr < 0) {
+ close(c.first);
+ backgroundClients.erase(c.first);
+ break; // backgroundClients has changed, bailout and start again
+ }
+ size_t n = nr;
+ buf.resize(n);
+ // Send it
+ asyncs.push_back(AsyncCall(c.second, c.second->begin_NewData(buf), c.first));
+ //c.second->NewData(buf);
+ }
+ }
+ }
+ break;
+ }
+ // Clean up finished async requests
+ asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) {
+ try {
+ if (a.get<1>()->isCompleted()) {
+ if (!a.get<0>()->end_NewData(a.get<1>())) {
+ close(a.get<2>());
+ std::lock_guard<std::mutex> g(lock);
+ backgroundClients.erase(a.get<2>());
+ }
+ return true;
+ }
+ return false;
+ }
+ catch (...) {
+ close(a.get<2>());
+ std::lock_guard<std::mutex> g(lock);
+ backgroundClients.erase(a.get<2>());
+ return true;
+ }
+ }), asyncs.end());
+ lock.lock();
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__);
+ BOOST_FOREACH(const auto & a, asyncs) {
+ try {
+ a.get<0>()->end_NewData(a.get<1>());
+ }
+ catch (...) {
+ }
+ }
+ backgroundThread = NULL;
+ Logger()->messagebf(LOG_DEBUG, "%s: Unlocking", __PRETTY_FUNCTION__);
+ lock.unlock();
+}
+
+bool
+Tuner::crc32(const P2PVR::Data & buf)
+{
+ boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc;
+ crc.process_bytes(&buf.front(), buf.size());
+ return crc.checksum() == 0;
+}
+