diff options
-rw-r--r-- | p2pvr/daemon/unittests/Jamfile.jam | 2 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/mockDevices.cpp | 6 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/testRecording.cpp | 5 | ||||
-rw-r--r-- | p2pvr/devices/Jamfile.jam | 1 | ||||
-rw-r--r-- | p2pvr/devices/frontend.h | 2 | ||||
-rw-r--r-- | p2pvr/devices/mockTuner.cpp | 182 | ||||
-rw-r--r-- | p2pvr/devices/mockTuner.h | 36 | ||||
-rw-r--r-- | p2pvr/devices/tuner.cpp | 130 | ||||
-rw-r--r-- | p2pvr/devices/tuner.h | 20 | ||||
-rw-r--r-- | p2pvr/devices/tunerSendSi.cpp | 46 | ||||
-rw-r--r-- | p2pvr/devices/tunerSendSi.h | 16 | ||||
-rw-r--r-- | p2pvr/devices/tunerSendTs.cpp | 8 | ||||
-rw-r--r-- | p2pvr/devices/tunerSendTs.h | 3 | ||||
-rw-r--r-- | p2pvr/dvb/Jamfile.jam | 4 |
14 files changed, 268 insertions, 193 deletions
diff --git a/p2pvr/daemon/unittests/Jamfile.jam b/p2pvr/daemon/unittests/Jamfile.jam index eb4bf2e..2ea4691 100644 --- a/p2pvr/daemon/unittests/Jamfile.jam +++ b/p2pvr/daemon/unittests/Jamfile.jam @@ -8,6 +8,7 @@ lib IceBox ; lib dbppcore : : : : <include>/usr/include/dbpp ; lib dbpp-postgresql : : : : <include>/usr/include/dbpp-postgresql ; lib dryice : : : : <include>/usr/include/icetray ; +lib icetray : : : : <include>/usr/include/icetray ; path-constant me : . ; @@ -33,6 +34,7 @@ lib testCommon : <define>ROOT=\"$(me)\" <library>../..//adhocutil <library>dryice + <library>icetray <library>Ice <library>IceUtil <library>IceBox diff --git a/p2pvr/daemon/unittests/mockDevices.cpp b/p2pvr/daemon/unittests/mockDevices.cpp index 12203b0..5ff2cb9 100644 --- a/p2pvr/daemon/unittests/mockDevices.cpp +++ b/p2pvr/daemon/unittests/mockDevices.cpp @@ -7,13 +7,13 @@ namespace P2PVR { MockDevices::MockDevices(Ice::CommunicatorPtr c) : ic(c) { - devices.push_back("/dev/dvb/dummy"); + devices.push_back("/dev/null"); } TunerPtr - MockDevices::openTuner(const boost::filesystem::path &) const + MockDevices::openTuner(const boost::filesystem::path & path) const { - return new MockTuner(ic); + return new MockTuner(path, ic); } } } diff --git a/p2pvr/daemon/unittests/testRecording.cpp b/p2pvr/daemon/unittests/testRecording.cpp index ae3a3a3..d875130 100644 --- a/p2pvr/daemon/unittests/testRecording.cpp +++ b/p2pvr/daemon/unittests/testRecording.cpp @@ -63,13 +63,12 @@ BOOST_AUTO_TEST_CASE( streamServiceToTarget ) BOOST_TEST_CHECKPOINT("Start"); ss->Start(); - sleep(2); + sleep(8); BOOST_TEST_CHECKPOINT("Stop"); ss->Stop(); - BOOST_TEST_MESSAGE("Received bytes: " << target->bytesReceived); - BOOST_REQUIRE(target->bytesReceived > 150000); + BOOST_REQUIRE_EQUAL(target->bytesReceived, 3111117); } BOOST_AUTO_TEST_SUITE_END(); diff --git a/p2pvr/devices/Jamfile.jam b/p2pvr/devices/Jamfile.jam index 0379340..92573d5 100644 --- a/p2pvr/devices/Jamfile.jam +++ b/p2pvr/devices/Jamfile.jam @@ -46,6 +46,7 @@ lib p2pvrMockTuner : <library>boost_system <library>boost_thread <library>boost_filesystem + <library>p2pvrdevices <library>../dvb//p2pvrdvb <library>../ice//p2pvrice <library>../lib//p2pvrlib diff --git a/p2pvr/devices/frontend.h b/p2pvr/devices/frontend.h index 63b000d..6d9a0fa 100644 --- a/p2pvr/devices/frontend.h +++ b/p2pvr/devices/frontend.h @@ -10,7 +10,7 @@ namespace P2PVR { namespace DVB { class TunerI; -class Frontend { +class DLL_PUBLIC Frontend { public: typedef boost::function<bool(long)> OnFrequencyFound; Frontend(TunerI *, const struct dvb_frontend_info &, IceTray::Logging::LoggerPtr); diff --git a/p2pvr/devices/mockTuner.cpp b/p2pvr/devices/mockTuner.cpp index 9abcfbf..8e772c6 100644 --- a/p2pvr/devices/mockTuner.cpp +++ b/p2pvr/devices/mockTuner.cpp @@ -31,7 +31,6 @@ ResourceFile(pmt); ResourceFile(vid); int MockTuner::eventSet = 0; -int MockTuner::senderId = 1; IceTray::Logging::LoggerPtr MockTuner::logger = LOGMANAGER()->getLogger<MockTuner>(); @@ -41,28 +40,33 @@ MockTuner::SetEventsSet(int n) eventSet = n; } -MockTuner::MockTuner(Ice::CommunicatorPtr c) : - ic(c) -{ -} - -void MockTuner::TuneTo(const DVBSI::DeliveryPtr &) -{ -} - -int MockTuner::GetStatus() -{ - return 0; -} +class MockFrontend : public DVB::Frontend { + public: + MockFrontend(DVB::TunerI * t, const struct dvb_frontend_info & fi, IceTray::Logging::LoggerPtr log) : + Frontend(t, fi, log) + { + } + void TuneTo(const DVBSI::DeliveryPtr &) const override + { + } + void FrequencyScan(const OnFrequencyFound & off) const override + { + off(0); + } + std::string Type() const override + { + return "Mocked"; + } +}; -std::string MockTuner::GetDevice() +MockTuner::MockTuner(const boost::filesystem::path & deviceFrontend, Ice::CommunicatorPtr c) : + TunerI(deviceFrontend, DVB::FrontendPtr(new MockFrontend(this, dvb_frontend_info(), logger))), + ic(c) { - return "/dev/dvb/dummy"; } Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz) { - logger->messagebf(LOG::DEBUG, "%s: setup", __PRETTY_FUNCTION__); lzma_stream strm = LZMA_STREAM_INIT; const uint32_t flags = LZMA_TELL_UNSUPPORTED_CHECK | LZMA_CONCATENATED; const uint64_t memory_limit = UINT64_MAX; @@ -71,7 +75,6 @@ Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz) strm.avail_in = dataxz.size(); uint8_t buf[BUFSIZ]; - logger->messagebf(LOG::DEBUG, "%s: decompress %d bytes", __PRETTY_FUNCTION__, dataxz.size()); Ice::ByteSeq data; data.reserve(dataxz.size() * 20); do { @@ -86,101 +89,110 @@ Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz) return data; } -void MockTuner::DecompressAndSendPackets(const Ice::ByteSeq & dataxz, const P2PVR::RawDataClientPrx & client) const +std::list<Ice::ByteSeq> +MockTuner::DecompressAndRead(const Ice::ByteSeq & dataxz) const { - logger->messagebf(LOG::DEBUG, "%s: deserialize", __PRETTY_FUNCTION__); + logger->messagef(LOG::DEBUG, "%s: decompress %zu bytes", __PRETTY_FUNCTION__, dataxz.size()); std::list<Ice::ByteSeq> packets; auto istrm = Ice::createInputStream(ic, Decompress(dataxz)); istrm->read(packets); - - logger->messagebf(LOG::DEBUG, "%s: send", __PRETTY_FUNCTION__); - for (const auto & packet : packets) { - client->NewData(packet); - } - - logger->messagebf(LOG::DEBUG, "%s: complete", __PRETTY_FUNCTION__); -} - -void MockTuner::ScanAndSendNetworkInformation(const P2PVR::RawDataClientPrx & client) -{ - DecompressAndSendPackets(network, client); -} - -void MockTuner::SendNetworkInformation(const P2PVR::RawDataClientPrx & client) -{ - DecompressAndSendPackets(network, client); -} - -void MockTuner::SendBouquetAssociations(const P2PVR::RawDataClientPrx &) -{ -} - -void MockTuner::SendServiceDescriptions(const P2PVR::RawDataClientPrx & client) -{ - DecompressAndSendPackets(services, client); + logger->messagef(LOG::DEBUG, "%s: read %zu packets", __PRETTY_FUNCTION__, packets.size()); + return packets; } -void MockTuner::SendProgramMap(Ice::Int, const P2PVR::RawDataClientPrx &) +AdHoc::FileUtils::FileHandle +MockTuner::OpenDemux() const { + return AdHoc::FileUtils::FileHandle("/dev/null"); } -void MockTuner::SendProgramAssociationTable(const P2PVR::RawDataClientPrx &) +int +MockTuner::iopoll(struct pollfd *fds, nfds_t nfds, int) const { + auto & packets = selectedPackets[fds[0].fd]; + if (packets.empty()) { + return -2; + } + if (nfds > 0) { + fds[0].revents = POLLIN; + return 1; + } + return 0; } -void MockTuner::SendEventInformation(const P2PVR::RawDataClientPrx & client) +ssize_t +MockTuner::ioread(int fh, void *buf, size_t count) const { - DecompressAndSendPackets(eventSet == 0 ? events1 : events2, client); + auto & packets = selectedPackets[fh]; + if (selectedPackets.empty()) { + return 0; + } + auto & packet = packets.front(); + if (count < packet.size()) { + // packet needs splitting to fit into buf + ::memcpy(buf, packet.data(), count); + packet.erase(packet.begin(), packet.begin() + count); + return count; + } + else { + // buf is big enough for the whole packet + auto bytes = packet.size(); + ::memcpy(buf, packet.data(), bytes); + packets.pop_front(); + return bytes; + } } -void MockTuner::SendLoop(const P2PVR::RawDataClientPrx & t, const Ice::ByteSeq & dataxz) const +int +MockTuner::ioselect(int nfds, fd_set *readfds, fd_set *, fd_set *, struct timeval * timeout) const { - std::list<Ice::ByteSeq> packets; - auto istrm = Ice::createInputStream(ic, Decompress(dataxz)); - istrm->read(packets); - logger->messagebf(LOG::DEBUG, "%s: loop over %d packets", __PRETTY_FUNCTION__, packets.size()); - auto p = packets.begin(); - while (true) { - { - boost::this_thread::disable_interruption whileSending; - t->NewData(*p); - } - boost::this_thread::interruption_point(); - p++; - if (p == packets.end()) { - p = packets.begin(); + int count = 0; + for (int fd = 0; fd < nfds; fd += 1) { + if (FD_ISSET(fd, readfds)) { + if (selectedPackets[fd].empty()) { + FD_CLR(fd, readfds); + } + else { + count += 1; + } } - usleep(100000); } + if (count == 0) { + sleep(timeout->tv_sec); + } + return count; } - -int MockTuner::StartSendingTS(const P2PVR::PacketIds &, const P2PVR::RawDataClientPrx & t) -{ - return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, vid)}).first->first; -} - -int MockTuner::StartSendingSection(Ice::Int sid, const P2PVR::RawDataClientPrx & t) +void +MockTuner::RequestPID(int pid, int fh) const { - switch (sid) { + logger->messagef(LOG::DEBUG, "%s: pid %x, fh %d", __PRETTY_FUNCTION__, pid, fh); + switch (pid) { case 0: // pat - return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, pat)}).first->first; + selectedPackets[fh] = DecompressAndRead(pat); + break; + case 0x10: + selectedPackets[fh] = DecompressAndRead(network); + break; + case 0x11: + selectedPackets[fh] = DecompressAndRead(services); + break; + case 0x12: + selectedPackets[fh] = DecompressAndRead(eventSet == 0 ? events1 : events2); + break; case 100: // sample pmt - return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, pmt)}).first->first; + selectedPackets[fh] = DecompressAndRead(pmt); + break; } - throw std::runtime_error("I don't have a sample for that"); } -void MockTuner::StopSending(int s) +void +MockTuner::RequestTS(const PacketIds & pids, int fh) const { - logger->messagebf(LOG::DEBUG, "%s: stop %d", __PRETTY_FUNCTION__, s); - auto sitr = senders.find(s); - if (sitr != senders.end()) { - sitr->second->interrupt(); - sitr->second->join(); - senders.erase(sitr); - } + logger->messagef(LOG::DEBUG, "%s: pids %zu, fh %d", __PRETTY_FUNCTION__, pids.size(), fh); + selectedPackets[fh] = DecompressAndRead(vid); } + } } diff --git a/p2pvr/devices/mockTuner.h b/p2pvr/devices/mockTuner.h index d0db11e..74578db 100644 --- a/p2pvr/devices/mockTuner.h +++ b/p2pvr/devices/mockTuner.h @@ -6,42 +6,34 @@ #include <Ice/BuiltinSequences.h> #include <boost/thread.hpp> #include <logger.h> +#include "tuner.h" namespace P2PVR { namespace Testing { -class DLL_PUBLIC MockTuner : public Tuner { +class DLL_PUBLIC MockTuner : public DVB::TunerI { public: - MockTuner(Ice::CommunicatorPtr); + MockTuner(const boost::filesystem::path & deviceFrontend, Ice::CommunicatorPtr); - void TuneTo(const DVBSI::DeliveryPtr &) override; - int GetStatus() override; - std::string GetDevice() override; - - void ScanAndSendNetworkInformation(const RawDataClientPrx & client) override; - void SendNetworkInformation(const RawDataClientPrx & client) override; - void SendBouquetAssociations(const RawDataClientPrx & client) override; - void SendServiceDescriptions(const RawDataClientPrx & client) override; - void SendProgramMap(Ice::Int pid, const RawDataClientPrx & client) override; - void SendProgramAssociationTable(const RawDataClientPrx & client) override; - void SendEventInformation(const RawDataClientPrx & client) override; - - int StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client) override; - int StartSendingSection(Ice::Int pid, const RawDataClientPrx & client) override; - void StopSending(int handle) override; + AdHoc::FileUtils::FileHandle OpenDemux() const override; + void RequestPID(int, int) const override; + void RequestTS(const PacketIds &, int fd) const override; static void SetEventsSet(int n); protected: + int iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const override; + ssize_t ioread(int fd, void *buf, size_t count) const override; + int ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const override; + static Ice::ByteSeq Decompress(const Ice::ByteSeq &); static void LZMA_ASSERT(int ret_xz); - void DecompressAndSendPackets(const Ice::ByteSeq &, const RawDataClientPrx &) const; - void SendLoop(const RawDataClientPrx & t, const Ice::ByteSeq & dataxz) const; + std::list<Ice::ByteSeq> DecompressAndRead(const Ice::ByteSeq &) const; - static int eventSet; - std::map<int, boost::thread *> senders; - static int senderId; Ice::CommunicatorPtr ic; + // The open fh -> list of DVB packets + mutable std::map<int, std::list<Ice::ByteSeq>> selectedPackets; + static int eventSet; static IceTray::Logging::LoggerPtr logger; }; } diff --git a/p2pvr/devices/tuner.cpp b/p2pvr/devices/tuner.cpp index e7fd2eb..d3d0488 100644 --- a/p2pvr/devices/tuner.cpp +++ b/p2pvr/devices/tuner.cpp @@ -2,7 +2,6 @@ #include <fcntl.h> #include <Ice/Ice.h> #include <sys/ioctl.h> -#include <poll.h> #include <factory.h> #include <linux/dvb/frontend.h> #include <linux/dvb/dmx.h> @@ -26,11 +25,11 @@ TunerI::TunerI(const boost::filesystem::path & df, FrontendPtr pfe) : frontend = pfe; } else { - struct dvb_frontend_info fe_info; - if (ioctl(frontendFD, FE_GET_INFO, &fe_info) < 0) { - throw DeviceError(deviceFrontend.string(), strerror(errno), errno); - } - frontend = FrontendPtr(FrontendFactory::createNew(Frontend::FactoryKey(fe_info.type), this, fe_info)); + struct dvb_frontend_info fe_info; + if (ioctl(frontendFD, FE_GET_INFO, &fe_info) < 0) { + throw DeviceError(deviceFrontend.string(), strerror(errno), errno); + } + frontend = FrontendPtr(FrontendFactory::createNew(Frontend::FactoryKey(fe_info.type), this, fe_info)); } logger->messagebf(LOG::INFO, "%s: Attached to %s (%s, type %s)", __PRETTY_FUNCTION__, deviceRoot, frontend->Info().name, frontend->Type()); @@ -132,7 +131,7 @@ TunerI::SendPID(int pid, const RawDataClientPrx & client) const { logger->messagebf(LOG::DEBUG, "%s: pid = 0x%x", __PRETTY_FUNCTION__, pid); - AdHoc::FileUtils::FileHandle demux(OpenDemux()); + auto demux = OpenDemux(); RequestPID(pid, demux); return ReadDemuxAndSend(std::move(demux), client); } @@ -151,8 +150,26 @@ TunerI::RequestPID(int pid, int demux) const } } +int +TunerI::iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const +{ + return ::poll(fds, nfds, timeout); +} + +ssize_t +TunerI::ioread(int fd, void *buf, size_t count) const +{ + return ::read(fd, buf, count); +} + +int +TunerI::ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const +{ + return select(nfds, readfds, writefds, exceptfds, timeout); +} + uint64_t -TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataClientPrx & _client) const +TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle demux, const RawDataClientPrx & _client) const { logger->messagebf(LOG::DEBUG, "%s: begin", __PRETTY_FUNCTION__); struct pollfd ufd; @@ -162,30 +179,36 @@ TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataCli BackgroundClient client = BackgroundClient(new SendSi(std::move(demux), _client)); do { // Wait for data to appear - switch (poll(&ufd, 1, options->DemuxReadTimeout)) { + switch (iopoll(&ufd, 1, options->DemuxReadTimeout)) { + case -2: + //logger->messagef(LOG::DEBUG, "%s: All test data has been read... waiting...", __PRETTY_FUNCTION__); + break; case -1: - logger->messagebf(LOG::DEBUG, "%s: poll error reading demux (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + logger->messagef(LOG::ERR, "%s: poll error reading demux (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); throw DeviceError("demux", strerror(errno), errno); case 0: - auto status = frontend->GetStatus(); - logger->messagebf(LOG::DEBUG, "%s: Timed out waiting for data (device status 0x%02x)", __PRETTY_FUNCTION__, status); + logger->messagef(LOG::WARNING, "%s: Timed out waiting for data (device status 0x%02x)", + __PRETTY_FUNCTION__, frontend->GetStatus()); throw DeviceError("demux", "timeout", 0); - } + default: + { + // Read it + Data buf(1 << 12); + int nr = ioread(ufd.fd, &buf.front(), buf.size()); + if (nr < 0) { + logger->messagef(LOG::ERR, "%s: error reading demux (%d:%s) status 0x%02x", + __PRETTY_FUNCTION__, errno, strerror(errno), frontend->GetStatus()); + throw DeviceError("demux", strerror(errno), errno); + } + size_t n = nr; + buf.resize(n); - // Read it - Data buf(1 << 12); - int nr = read(demux, &buf.front(), buf.size()); - if (nr < 0) { - logger->messagebf(LOG::DEBUG, "%s: error reading demux (%d:%s) status 0x%02x", - __PRETTY_FUNCTION__, errno, strerror(errno), frontend->GetStatus()); - throw DeviceError("demux", strerror(errno), errno); + logger->messagef(LOG::DEBUG, "%s: sending", __PRETTY_FUNCTION__); + client->NewData(buf); + logger->messagef(LOG::DEBUG, "%s: sent", __PRETTY_FUNCTION__); + } } - size_t n = nr; - buf.resize(n); - - client->NewData(buf); - - } while (!client->IsFinished()); + } while (!client->IsFinished() && client->HasPending()); auto packetsSent = client->PacketsSent(); client.reset(); logger->messagebf(LOG::DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent); @@ -195,10 +218,8 @@ TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataCli int TunerI::StartSendingSection(int pid, const RawDataClientPrx & client) { - logger->message(LOG::DEBUG, __PRETTY_FUNCTION__); - Lock(lock); - BackgroundClient bgc(new SendSi(OpenDemux(), client)); + BackgroundClient bgc(new SendSiStream(OpenDemux(), client)); RequestPID(pid, bgc->fileHandle()); int demux = backgroundClients.insert({ bgc->fileHandle(), bgc }).first->first; @@ -209,7 +230,6 @@ TunerI::StartSendingSection(int pid, const RawDataClientPrx & client) int TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client) { - logger->message(LOG::DEBUG, __PRETTY_FUNCTION__); if (pids.empty()) { throw DeviceError("demux", "Packet Id list cannot be empty", 0); } @@ -218,9 +238,19 @@ TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client) BackgroundClient bgc(new SendTs(OpenDemux(), client)); int demux = bgc->fileHandle(); + RequestTS(pids, demux); + + backgroundClients.insert({ demux, bgc }); + startSenderThread(); + return demux; +} + +void +TunerI::RequestTS(const PacketIds & pids, int demux) const +{ struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); - logger->messagebf(LOG::ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]); + logger->messagef(LOG::ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]); pesFilterParams.pid = pids[0]; pesFilterParams.input = DMX_IN_FRONTEND; pesFilterParams.output = DMX_OUT_TSDEMUX_TAP; @@ -228,50 +258,42 @@ TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client) pesFilterParams.flags = 0; if (ioctl(demux, DMX_SET_PES_FILTER, &pesFilterParams) < 0) { - backgroundClients.erase(demux); - logger->messagebf(LOG::ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + logger->messagef(LOG::ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); throw DeviceError("demux", strerror(errno), errno); } for (unsigned int x = 1; x < pids.size(); x += 1) { __u16 p = pids[x]; - logger->messagebf(LOG::ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p); + logger->messagef(LOG::ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p); if (ioctl(demux, DMX_ADD_PID, &p) < 0) { - backgroundClients.erase(demux); - logger->messagebf(LOG::ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + logger->messagef(LOG::ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); throw DeviceError("demux", strerror(errno), errno); } } setBufferSize(demux, options->DemuxStreamBufferSize); if (ioctl(demux, DMX_START) < 0) { - backgroundClients.erase(demux); - logger->messagebf(LOG::ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + logger->messagef(LOG::ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno)); throw DeviceError("demux", strerror(errno), errno); } - - backgroundClients.insert({ demux, bgc }); - startSenderThread(); - return demux; } void TunerI::setBufferSize(int demux, unsigned long size) { if (ioctl(demux, DMX_SET_BUFFER_SIZE, size)) { - logger->messagebf(LOG::ERR, "%s: DMX_SET_BUFFER_SIZE to %d failed (%d: %s)", __PRETTY_FUNCTION__, size, errno, strerror(errno)); + logger->messagef(LOG::ERR, "%s: DMX_SET_BUFFER_SIZE to %lu failed (%d: %s)", __PRETTY_FUNCTION__, size, errno, strerror(errno)); throw DeviceError("demux", strerror(errno), errno); } - logger->messagebf(LOG::DEBUG, "%s: DMX_SET_BUFFER_SIZE to %d", __PRETTY_FUNCTION__, size); + logger->messagef(LOG::DEBUG, "%s: DMX_SET_BUFFER_SIZE to %lu", __PRETTY_FUNCTION__, size); } void TunerI::StopSending(int handle) { - logger->message(LOG::DEBUG, __PRETTY_FUNCTION__); + logger->messagef(LOG::DEBUG, "%s: handle %d", __PRETTY_FUNCTION__, handle); std::lock_guard<std::mutex> g(lock); if (backgroundClients.find(handle) != backgroundClients.end()) { - close(handle); backgroundClients.erase(handle); } } @@ -298,22 +320,22 @@ TunerI::senderThread() lock.unlock(); struct timeval tv { 2, 0 }; - switch (select(n, &rfds, NULL, NULL, &tv)) { + int s = ioselect(n, &rfds, NULL, NULL, &tv); + lock.lock(); + switch (s) { case -1: // error - logger->messagebf(LOG::DEBUG, "%s: select failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); + logger->messagebf(LOG::WARNING, "%s: select failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); case 0: // nothing to read, but all is well break; default: { // stuff to do - std::lock_guard<std::mutex> g(lock); for (auto c = backgroundClients.begin(); c != backgroundClients.end(); ) { if (FD_ISSET(c->first, &rfds)) { // Read it Data buf(1 << 16); - int nr = read(c->first, &buf.front(), buf.size()); + int nr = ioread(c->first, &buf.front(), buf.size()); if (nr < 0) { - logger->messagebf(LOG::DEBUG, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); - close(c->first); + logger->messagef(LOG::ERR, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); c = backgroundClients.erase(c); } else { @@ -331,10 +353,8 @@ TunerI::senderThread() break; } // Clean up finished async requests - lock.lock(); for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) { if (client->second->IsFinished()) { - close(client->first); client = backgroundClients.erase(client); } else { @@ -347,7 +367,7 @@ TunerI::senderThread() lock.unlock(); } -TunerI::IDataSender::IDataSender(AdHoc::FileUtils::FileHandle && h, const RawDataClientPrx & c) : +TunerI::IDataSender::IDataSender(AdHoc::FileUtils::FileHandle h, const RawDataClientPrx & c) : _packetsSent(0), fh(std::move(h)), client(c) diff --git a/p2pvr/devices/tuner.h b/p2pvr/devices/tuner.h index 8534a3c..983d3ff 100644 --- a/p2pvr/devices/tuner.h +++ b/p2pvr/devices/tuner.h @@ -4,6 +4,7 @@ #include <dvb.h> #include <boost/filesystem/path.hpp> #include <fileUtils.h> +#include <poll.h> #include "frontend.h" #include <map> #include <thread> @@ -21,21 +22,22 @@ namespace Frontends { class OFDM; } -class TunerI : public Tuner { +class DLL_PUBLIC TunerI : public Tuner { public: class IDataSender { public: - IDataSender(AdHoc::FileUtils::FileHandle &&, const RawDataClientPrx &); + IDataSender(AdHoc::FileUtils::FileHandle, const RawDataClientPrx &); virtual ~IDataSender() = 0; virtual void NewData(const Data &) = 0; virtual bool IsFinished() = 0; + virtual bool HasPending() = 0; uint64_t PacketsSent() const; int fileHandle() const; protected: uint64_t _packetsSent; - AdHoc::FileUtils::FileHandle fh; + const AdHoc::FileUtils::FileHandle fh; const RawDataClientPrx client; }; typedef boost::shared_ptr<IDataSender> BackgroundClient; @@ -60,11 +62,17 @@ class TunerI : public Tuner { int StartSendingSection(Ice::Int pid, const RawDataClientPrx & client) override; void StopSending(int handle) override; + protected: + virtual int iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const; + virtual ssize_t ioread(int fd, void *buf, size_t count) const; + virtual int ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const; + private: - AdHoc::FileUtils::FileHandle OpenDemux() const; + virtual AdHoc::FileUtils::FileHandle OpenDemux() const; uint64_t SendPID(int pid, const RawDataClientPrx & client) const; - void RequestPID(int pid, int fd) const; - uint64_t ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && fd, const RawDataClientPrx & client) const; + virtual void RequestPID(int pid, int fd) const; + virtual void RequestTS(const PacketIds &, int fd) const; + uint64_t ReadDemuxAndSend(AdHoc::FileUtils::FileHandle fd, const RawDataClientPrx & client) const; void startSenderThread(); void senderThread(); static void setBufferSize(int fd, unsigned long bytes); diff --git a/p2pvr/devices/tunerSendSi.cpp b/p2pvr/devices/tunerSendSi.cpp index bc901f7..822ec14 100644 --- a/p2pvr/devices/tunerSendSi.cpp +++ b/p2pvr/devices/tunerSendSi.cpp @@ -7,7 +7,7 @@ namespace P2PVR { namespace DVB { IceTray::Logging::LoggerPtr SendSi::logger(LOGMANAGER()->getLogger<SendSi>()); -SendSi::SendSi(AdHoc::FileUtils::FileHandle && fh, const RawDataClientPrx & c) : +SendSi::SendSi(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) : TunerI::IDataSender(std::move(fh), c->ice_collocationOptimized(false)) { } @@ -16,6 +16,15 @@ SendSi::~SendSi() { } +SendSiStream::SendSiStream(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) : + SendSi(std::move(fh), c) +{ +} + +SendSiStream::~SendSiStream() +{ +} + void SendSi::NewData(const Data & buf) { @@ -29,19 +38,26 @@ SendSi::NewData(const Data & buf) bool SendSi::IsFinished() { - try { - for (auto c = asyncs.begin(); c != asyncs.end(); ) { - if ((*c)->isCompleted()) { - if (client->end_NewData(*c)) { - return true; - } - c = asyncs.erase(c); - } - else { - c++; + for (auto c = asyncs.begin(); c != asyncs.end(); ) { + if ((*c)->isCompleted()) { + auto a = *c; + c = asyncs.erase(c); + if (client->end_NewData(a)) { + return true; } } - return false; + else { + c++; + } + } + return false; +} + +bool +SendSiStream::IsFinished() +{ + try { + return SendSi::IsFinished(); } catch (const std::exception & ex) { logger->messagebf(LOG::DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what()); @@ -50,6 +66,12 @@ SendSi::IsFinished() } bool +SendSi::HasPending() +{ + return !asyncs.empty(); +} + +bool SendSi::IsValidSection(const Data & buf) { auto n = buf.size(); diff --git a/p2pvr/devices/tunerSendSi.h b/p2pvr/devices/tunerSendSi.h index e590e29..e6517c6 100644 --- a/p2pvr/devices/tunerSendSi.h +++ b/p2pvr/devices/tunerSendSi.h @@ -8,13 +8,14 @@ namespace P2PVR { namespace DVB { class SendSi : public TunerI::IDataSender { public: - SendSi(AdHoc::FileUtils::FileHandle &&, const P2PVR::RawDataClientPrx &); + SendSi(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &); ~SendSi(); - void NewData(const P2PVR::Data &); - bool IsFinished(); + void NewData(const P2PVR::Data &) override; + bool IsFinished() override; + bool HasPending() override; - private: + protected: static bool crc32(const P2PVR::Data &); static bool IsValidSection(const P2PVR::Data &); @@ -22,6 +23,13 @@ class SendSi : public TunerI::IDataSender { bool finish; static IceTray::Logging::LoggerPtr logger; }; +class SendSiStream : public SendSi { + public: + SendSiStream(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &); + ~SendSiStream(); + + bool IsFinished() override; +}; } } diff --git a/p2pvr/devices/tunerSendTs.cpp b/p2pvr/devices/tunerSendTs.cpp index 3f8864c..23bc3b4 100644 --- a/p2pvr/devices/tunerSendTs.cpp +++ b/p2pvr/devices/tunerSendTs.cpp @@ -10,7 +10,7 @@ namespace DVB { IceTray::Logging::LoggerPtr SendTs::logger(LOGMANAGER()->getLogger<SendTs>()); -SendTs::SendTs(AdHoc::FileUtils::FileHandle && fh, const RawDataClientPrx & c) : +SendTs::SendTs(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) : TunerI::IDataSender(std::move(fh), c->ice_collocationOptimized(false)) { buffer.reserve(TARGET_BUFFER_SIZE); @@ -76,6 +76,12 @@ SendTs::IsFinished() return true; } } + +bool +SendTs::HasPending() +{ + return async; +} } } diff --git a/p2pvr/devices/tunerSendTs.h b/p2pvr/devices/tunerSendTs.h index abdd4b5..5390882 100644 --- a/p2pvr/devices/tunerSendTs.h +++ b/p2pvr/devices/tunerSendTs.h @@ -8,11 +8,12 @@ namespace P2PVR { namespace DVB { class SendTs : public TunerI::IDataSender { public: - SendTs(AdHoc::FileUtils::FileHandle &&, const P2PVR::RawDataClientPrx &); + SendTs(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &); ~SendTs(); void NewData(const P2PVR::Data &); bool IsFinished(); + bool HasPending(); private: void sendBufferChunk(); diff --git a/p2pvr/dvb/Jamfile.jam b/p2pvr/dvb/Jamfile.jam index e9a31f4..b3176bb 100644 --- a/p2pvr/dvb/Jamfile.jam +++ b/p2pvr/dvb/Jamfile.jam @@ -1,8 +1,12 @@ +lib boost_system ; + lib p2pvrdvb : [ glob-tree *.cpp : unittests ] : <library>../ice//p2pvrice <library>..//adhocutil + <library>..//icetray + <library>boost_system <library>../..//glibmm <implicit-dependency>../ice//p2pvrice : : |