diff options
Diffstat (limited to 'p2pvr/lib/tuner.cpp')
-rw-r--r-- | p2pvr/lib/tuner.cpp | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp new file mode 100644 index 0000000..e508152 --- /dev/null +++ b/p2pvr/lib/tuner.cpp @@ -0,0 +1,355 @@ +#include "tuner.h" +#include <fcntl.h> +#include <Ice/Ice.h> +#include <sys/ioctl.h> +#include <poll.h> +#include <logger.h> +#include <misc.h> +#include <plugable.h> +#include <linux/dvb/frontend.h> +#include <linux/dvb/dmx.h> +#include <boost/crc.hpp> +#include <boost/tuple/tuple.hpp> +#include "fileHandle.h" +#include "siParsers/table.h" + +class FrontendNotSupported : public NotSupported { + public: + FrontendNotSupported(fe_type t) : NotSupported(stringbf("Frontend not supported: %s", t)) + { } +}; + +Tuner::Tuner(const boost::filesystem::path & df) : + deviceFrontend(df), + deviceRoot(df.branch_path()), + timeout(20000), + backgroundThread(NULL) +{ + int fd = open(deviceFrontend.string().c_str(), O_RDWR); + if (fd < 0) { + throw P2PVR::DeviceError(deviceFrontend.string(), strerror(errno), errno); + } + try { + struct dvb_frontend_info fe_info; + if (ioctl(fd, FE_GET_INFO, &fe_info) < 0) { + throw P2PVR::DeviceError(deviceFrontend.string(), strerror(errno), errno); + } + frontend = FrontendPtr(FrontendLoader::createNew<FrontendNotSupported>(fe_info.type, this, fd, fe_info)); + } + catch (...) { + close(fd); + throw; + } + Logger()->messagebf(LOG_INFO, "%s: Attached to %s (%s, type %s)", __PRETTY_FUNCTION__, + deviceRoot, frontend->Info().name, frontend->Type()); +} + +Tuner::~Tuner() +{ + while (!backgroundClients.empty()) { + close(backgroundClients.begin()->first); + backgroundClients.erase(backgroundClients.begin()); + } + if (backgroundThread) { + backgroundThread->join(); + delete backgroundThread; + } +} + +void +Tuner::TuneTo(const DVBSI::DeliveryPtr & mp, const Ice::Current&) +{ + frontend->TuneTo(mp); +} + +int +Tuner::GetStatus(const Ice::Current &) +{ + return frontend->GetStatus(); +} + +std::string +Tuner::Device() const +{ + return deviceRoot.string(); +} + +int +Tuner::OpenDemux() const +{ + int demux = open((deviceRoot / "demux0").string().c_str(), O_RDWR | O_NONBLOCK); + if (demux < 0) { + throw P2PVR::DeviceError(deviceRoot.string(), strerror(errno), errno); + } + return demux; +} + +void +Tuner::ScanAndSendNetworkInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + frontend->FrequencyScan([this, &client, &ice](long) { + try { + SendNetworkInformation(client, ice); + return true; + } + catch (...) { + return false; + } + }); +} + +void +Tuner::SendNetworkInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(0x10, client, ice); +} + +void +Tuner::SendBouquetAssociations(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(0x11, client, ice); +} + +void +Tuner::SendServiceDescriptions(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(0x11, client, ice); +} + +void +Tuner::SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(0x12, client, ice); +} + +void +Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) const +{ + Logger()->messagebf(LOG_DEBUG, "%s: pid = 0x%x", __PRETTY_FUNCTION__, pid); + + ice.con->createProxy(client->ice_getIdentity()); + FileHandle demux(OpenDemux()); + struct dmx_sct_filter_params sctFilterParams; + memset(&sctFilterParams, 0, sizeof(dmx_sct_filter_params)); + sctFilterParams.pid = pid; + sctFilterParams.flags = DMX_IMMEDIATE_START; + + if (ioctl(demux, DMX_SET_FILTER, &sctFilterParams) < 0) { + throw P2PVR::DeviceError("demux", strerror(errno), errno); + } + + ReadDemuxAndSend(demux, client); +} + +void +Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const +{ + Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); + std::vector<Ice::AsyncResultPtr> asyncs; + struct pollfd ufd; + bool exitFlag = false; + do { + // Wait for data to appear + memset(&ufd, 0, sizeof(pollfd)); + ufd.fd = demux; + ufd.events = POLLIN; + if (poll(&ufd, 1, timeout) < 1) { + Logger()->messagebf(LOG_DEBUG, "%s: Timed out waiting for data", __PRETTY_FUNCTION__); + throw P2PVR::DeviceError("demux", "Timed out. Tuned to a multiplex?", 0); + } + + // Read it + P2PVR::Data buf(1 << 12); + int nr = read(demux, &buf.front(), buf.size()); + if (nr < 0) { + throw P2PVR::DeviceError("demux", strerror(errno), errno); + } + size_t n = nr; + buf.resize(n); + + // Verify it + if (n < sizeof(SiTableHeader)) { + Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); + } + auto * tab = (const SiTableHeader *)(&buf.front()); + size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); + if (n < l) { + Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); + continue; + } + if (n > l) { + Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); + continue; + } + if (!crc32(buf)) { + Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); + continue; + } + + asyncs.push_back(client->begin_NewData(buf)); + + asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { + if (a->isCompleted()) { + exitFlag = client->end_NewData(a); + return true; + } + return false; + }), asyncs.end()); + } while (!exitFlag); + BOOST_FOREACH(const auto & a, asyncs) { + client->end_NewData(a); + } + Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); +} + +int +Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &) +{ + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + if (pids.empty()) { + throw P2PVR::DeviceError("demux", "Packet Id list cannot be empty", 0); + } + + std::lock_guard<std::mutex> g(lock); + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client)).first->first; + + struct dmx_pes_filter_params pesFilterParams; + memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); + Logger()->messagebf(LOG_ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]); + pesFilterParams.pid = pids[0]; + pesFilterParams.input = DMX_IN_FRONTEND; + pesFilterParams.output = DMX_OUT_TSDEMUX_TAP; + pesFilterParams.pes_type = DMX_PES_OTHER; + pesFilterParams.flags = 0; + + if (ioctl(demux, DMX_SET_PES_FILTER, &pesFilterParams) < 0) { + backgroundClients.erase(demux); + Logger()->messagebf(LOG_ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + throw P2PVR::DeviceError("demux", strerror(errno), errno); + } + + for (unsigned int x = 1; x < pids.size(); x += 1) { + __u16 p = pids[x]; + Logger()->messagebf(LOG_ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p); + if (ioctl(demux, DMX_ADD_PID, &p) < 0) { + backgroundClients.erase(demux); + Logger()->messagebf(LOG_ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + throw P2PVR::DeviceError("demux", strerror(errno), errno); + } + } + + if (ioctl(demux, DMX_START) < 0) { + backgroundClients.erase(demux); + Logger()->messagebf(LOG_ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + throw P2PVR::DeviceError("demux", strerror(errno), errno); + } + + startSenderThread(); + return demux; +} + +void +Tuner::StopSendingTS(int handle, const Ice::Current &) +{ + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + std::lock_guard<std::mutex> g(lock); + if (backgroundClients.find(handle) != backgroundClients.end()) { + close(handle); + backgroundClients.erase(handle); + } +} + +void +Tuner::startSenderThread() +{ + if (!backgroundThread) { + backgroundThread = new std::thread(&Tuner::senderThread, this); + } +} + +void +Tuner::senderThread() +{ + lock.lock(); + typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall; + std::vector<AsyncCall> asyncs; + while (!backgroundClients.empty()) { + int n = backgroundClients.rbegin()->first + 1; + fd_set rfds; + FD_ZERO(&rfds); + BOOST_FOREACH(const auto & c, backgroundClients) { + FD_SET(c.first, &rfds); + } + lock.unlock(); + + struct timeval tv { 2, 0 }; + switch (select(n, &rfds, NULL, NULL, &tv)) { + case -1: // error + case 0: // nothing to read, but all is well + break; + default: + { // stuff to do + std::lock_guard<std::mutex> g(lock); + BOOST_FOREACH(const auto & c, backgroundClients) { + if (FD_ISSET(c.first, &rfds)) { + // Read it + P2PVR::Data buf(1 << 16); + int nr = read(c.first, &buf.front(), buf.size()); + if (nr < 0) { + close(c.first); + backgroundClients.erase(c.first); + break; // backgroundClients has changed, bailout and start again + } + size_t n = nr; + buf.resize(n); + // Send it + asyncs.push_back(AsyncCall(c.second, c.second->begin_NewData(buf), c.first)); + //c.second->NewData(buf); + } + } + } + break; + } + // Clean up finished async requests + asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) { + try { + if (a.get<1>()->isCompleted()) { + if (!a.get<0>()->end_NewData(a.get<1>())) { + close(a.get<2>()); + std::lock_guard<std::mutex> g(lock); + backgroundClients.erase(a.get<2>()); + } + return true; + } + return false; + } + catch (...) { + close(a.get<2>()); + std::lock_guard<std::mutex> g(lock); + backgroundClients.erase(a.get<2>()); + return true; + } + }), asyncs.end()); + lock.lock(); + } + Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__); + BOOST_FOREACH(const auto & a, asyncs) { + try { + a.get<0>()->end_NewData(a.get<1>()); + } + catch (...) { + } + } + backgroundThread = NULL; + Logger()->messagebf(LOG_DEBUG, "%s: Unlocking", __PRETTY_FUNCTION__); + lock.unlock(); +} + +bool +Tuner::crc32(const P2PVR::Data & buf) +{ + boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc; + crc.process_bytes(&buf.front(), buf.size()); + return crc.checksum() == 0; +} + |