summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--p2pvr/daemon/unittests/Jamfile.jam2
-rw-r--r--p2pvr/daemon/unittests/mockDevices.cpp6
-rw-r--r--p2pvr/daemon/unittests/testRecording.cpp5
-rw-r--r--p2pvr/devices/Jamfile.jam1
-rw-r--r--p2pvr/devices/frontend.h2
-rw-r--r--p2pvr/devices/mockTuner.cpp182
-rw-r--r--p2pvr/devices/mockTuner.h36
-rw-r--r--p2pvr/devices/tuner.cpp130
-rw-r--r--p2pvr/devices/tuner.h20
-rw-r--r--p2pvr/devices/tunerSendSi.cpp46
-rw-r--r--p2pvr/devices/tunerSendSi.h16
-rw-r--r--p2pvr/devices/tunerSendTs.cpp8
-rw-r--r--p2pvr/devices/tunerSendTs.h3
-rw-r--r--p2pvr/dvb/Jamfile.jam4
14 files changed, 268 insertions, 193 deletions
diff --git a/p2pvr/daemon/unittests/Jamfile.jam b/p2pvr/daemon/unittests/Jamfile.jam
index eb4bf2e..2ea4691 100644
--- a/p2pvr/daemon/unittests/Jamfile.jam
+++ b/p2pvr/daemon/unittests/Jamfile.jam
@@ -8,6 +8,7 @@ lib IceBox ;
lib dbppcore : : : : <include>/usr/include/dbpp ;
lib dbpp-postgresql : : : : <include>/usr/include/dbpp-postgresql ;
lib dryice : : : : <include>/usr/include/icetray ;
+lib icetray : : : : <include>/usr/include/icetray ;
path-constant me : . ;
@@ -33,6 +34,7 @@ lib testCommon :
<define>ROOT=\"$(me)\"
<library>../..//adhocutil
<library>dryice
+ <library>icetray
<library>Ice
<library>IceUtil
<library>IceBox
diff --git a/p2pvr/daemon/unittests/mockDevices.cpp b/p2pvr/daemon/unittests/mockDevices.cpp
index 12203b0..5ff2cb9 100644
--- a/p2pvr/daemon/unittests/mockDevices.cpp
+++ b/p2pvr/daemon/unittests/mockDevices.cpp
@@ -7,13 +7,13 @@ namespace P2PVR {
MockDevices::MockDevices(Ice::CommunicatorPtr c) :
ic(c)
{
- devices.push_back("/dev/dvb/dummy");
+ devices.push_back("/dev/null");
}
TunerPtr
- MockDevices::openTuner(const boost::filesystem::path &) const
+ MockDevices::openTuner(const boost::filesystem::path & path) const
{
- return new MockTuner(ic);
+ return new MockTuner(path, ic);
}
}
}
diff --git a/p2pvr/daemon/unittests/testRecording.cpp b/p2pvr/daemon/unittests/testRecording.cpp
index ae3a3a3..d875130 100644
--- a/p2pvr/daemon/unittests/testRecording.cpp
+++ b/p2pvr/daemon/unittests/testRecording.cpp
@@ -63,13 +63,12 @@ BOOST_AUTO_TEST_CASE( streamServiceToTarget )
BOOST_TEST_CHECKPOINT("Start");
ss->Start();
- sleep(2);
+ sleep(8);
BOOST_TEST_CHECKPOINT("Stop");
ss->Stop();
- BOOST_TEST_MESSAGE("Received bytes: " << target->bytesReceived);
- BOOST_REQUIRE(target->bytesReceived > 150000);
+ BOOST_REQUIRE_EQUAL(target->bytesReceived, 3111117);
}
BOOST_AUTO_TEST_SUITE_END();
diff --git a/p2pvr/devices/Jamfile.jam b/p2pvr/devices/Jamfile.jam
index 0379340..92573d5 100644
--- a/p2pvr/devices/Jamfile.jam
+++ b/p2pvr/devices/Jamfile.jam
@@ -46,6 +46,7 @@ lib p2pvrMockTuner :
<library>boost_system
<library>boost_thread
<library>boost_filesystem
+ <library>p2pvrdevices
<library>../dvb//p2pvrdvb
<library>../ice//p2pvrice
<library>../lib//p2pvrlib
diff --git a/p2pvr/devices/frontend.h b/p2pvr/devices/frontend.h
index 63b000d..6d9a0fa 100644
--- a/p2pvr/devices/frontend.h
+++ b/p2pvr/devices/frontend.h
@@ -10,7 +10,7 @@ namespace P2PVR {
namespace DVB {
class TunerI;
-class Frontend {
+class DLL_PUBLIC Frontend {
public:
typedef boost::function<bool(long)> OnFrequencyFound;
Frontend(TunerI *, const struct dvb_frontend_info &, IceTray::Logging::LoggerPtr);
diff --git a/p2pvr/devices/mockTuner.cpp b/p2pvr/devices/mockTuner.cpp
index 9abcfbf..8e772c6 100644
--- a/p2pvr/devices/mockTuner.cpp
+++ b/p2pvr/devices/mockTuner.cpp
@@ -31,7 +31,6 @@ ResourceFile(pmt);
ResourceFile(vid);
int MockTuner::eventSet = 0;
-int MockTuner::senderId = 1;
IceTray::Logging::LoggerPtr MockTuner::logger = LOGMANAGER()->getLogger<MockTuner>();
@@ -41,28 +40,33 @@ MockTuner::SetEventsSet(int n)
eventSet = n;
}
-MockTuner::MockTuner(Ice::CommunicatorPtr c) :
- ic(c)
-{
-}
-
-void MockTuner::TuneTo(const DVBSI::DeliveryPtr &)
-{
-}
-
-int MockTuner::GetStatus()
-{
- return 0;
-}
+class MockFrontend : public DVB::Frontend {
+ public:
+ MockFrontend(DVB::TunerI * t, const struct dvb_frontend_info & fi, IceTray::Logging::LoggerPtr log) :
+ Frontend(t, fi, log)
+ {
+ }
+ void TuneTo(const DVBSI::DeliveryPtr &) const override
+ {
+ }
+ void FrequencyScan(const OnFrequencyFound & off) const override
+ {
+ off(0);
+ }
+ std::string Type() const override
+ {
+ return "Mocked";
+ }
+};
-std::string MockTuner::GetDevice()
+MockTuner::MockTuner(const boost::filesystem::path & deviceFrontend, Ice::CommunicatorPtr c) :
+ TunerI(deviceFrontend, DVB::FrontendPtr(new MockFrontend(this, dvb_frontend_info(), logger))),
+ ic(c)
{
- return "/dev/dvb/dummy";
}
Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz)
{
- logger->messagebf(LOG::DEBUG, "%s: setup", __PRETTY_FUNCTION__);
lzma_stream strm = LZMA_STREAM_INIT;
const uint32_t flags = LZMA_TELL_UNSUPPORTED_CHECK | LZMA_CONCATENATED;
const uint64_t memory_limit = UINT64_MAX;
@@ -71,7 +75,6 @@ Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz)
strm.avail_in = dataxz.size();
uint8_t buf[BUFSIZ];
- logger->messagebf(LOG::DEBUG, "%s: decompress %d bytes", __PRETTY_FUNCTION__, dataxz.size());
Ice::ByteSeq data;
data.reserve(dataxz.size() * 20);
do {
@@ -86,101 +89,110 @@ Ice::ByteSeq MockTuner::Decompress(const Ice::ByteSeq & dataxz)
return data;
}
-void MockTuner::DecompressAndSendPackets(const Ice::ByteSeq & dataxz, const P2PVR::RawDataClientPrx & client) const
+std::list<Ice::ByteSeq>
+MockTuner::DecompressAndRead(const Ice::ByteSeq & dataxz) const
{
- logger->messagebf(LOG::DEBUG, "%s: deserialize", __PRETTY_FUNCTION__);
+ logger->messagef(LOG::DEBUG, "%s: decompress %zu bytes", __PRETTY_FUNCTION__, dataxz.size());
std::list<Ice::ByteSeq> packets;
auto istrm = Ice::createInputStream(ic, Decompress(dataxz));
istrm->read(packets);
-
- logger->messagebf(LOG::DEBUG, "%s: send", __PRETTY_FUNCTION__);
- for (const auto & packet : packets) {
- client->NewData(packet);
- }
-
- logger->messagebf(LOG::DEBUG, "%s: complete", __PRETTY_FUNCTION__);
-}
-
-void MockTuner::ScanAndSendNetworkInformation(const P2PVR::RawDataClientPrx & client)
-{
- DecompressAndSendPackets(network, client);
-}
-
-void MockTuner::SendNetworkInformation(const P2PVR::RawDataClientPrx & client)
-{
- DecompressAndSendPackets(network, client);
-}
-
-void MockTuner::SendBouquetAssociations(const P2PVR::RawDataClientPrx &)
-{
-}
-
-void MockTuner::SendServiceDescriptions(const P2PVR::RawDataClientPrx & client)
-{
- DecompressAndSendPackets(services, client);
+ logger->messagef(LOG::DEBUG, "%s: read %zu packets", __PRETTY_FUNCTION__, packets.size());
+ return packets;
}
-void MockTuner::SendProgramMap(Ice::Int, const P2PVR::RawDataClientPrx &)
+AdHoc::FileUtils::FileHandle
+MockTuner::OpenDemux() const
{
+ return AdHoc::FileUtils::FileHandle("/dev/null");
}
-void MockTuner::SendProgramAssociationTable(const P2PVR::RawDataClientPrx &)
+int
+MockTuner::iopoll(struct pollfd *fds, nfds_t nfds, int) const
{
+ auto & packets = selectedPackets[fds[0].fd];
+ if (packets.empty()) {
+ return -2;
+ }
+ if (nfds > 0) {
+ fds[0].revents = POLLIN;
+ return 1;
+ }
+ return 0;
}
-void MockTuner::SendEventInformation(const P2PVR::RawDataClientPrx & client)
+ssize_t
+MockTuner::ioread(int fh, void *buf, size_t count) const
{
- DecompressAndSendPackets(eventSet == 0 ? events1 : events2, client);
+ auto & packets = selectedPackets[fh];
+ if (selectedPackets.empty()) {
+ return 0;
+ }
+ auto & packet = packets.front();
+ if (count < packet.size()) {
+ // packet needs splitting to fit into buf
+ ::memcpy(buf, packet.data(), count);
+ packet.erase(packet.begin(), packet.begin() + count);
+ return count;
+ }
+ else {
+ // buf is big enough for the whole packet
+ auto bytes = packet.size();
+ ::memcpy(buf, packet.data(), bytes);
+ packets.pop_front();
+ return bytes;
+ }
}
-void MockTuner::SendLoop(const P2PVR::RawDataClientPrx & t, const Ice::ByteSeq & dataxz) const
+int
+MockTuner::ioselect(int nfds, fd_set *readfds, fd_set *, fd_set *, struct timeval * timeout) const
{
- std::list<Ice::ByteSeq> packets;
- auto istrm = Ice::createInputStream(ic, Decompress(dataxz));
- istrm->read(packets);
- logger->messagebf(LOG::DEBUG, "%s: loop over %d packets", __PRETTY_FUNCTION__, packets.size());
- auto p = packets.begin();
- while (true) {
- {
- boost::this_thread::disable_interruption whileSending;
- t->NewData(*p);
- }
- boost::this_thread::interruption_point();
- p++;
- if (p == packets.end()) {
- p = packets.begin();
+ int count = 0;
+ for (int fd = 0; fd < nfds; fd += 1) {
+ if (FD_ISSET(fd, readfds)) {
+ if (selectedPackets[fd].empty()) {
+ FD_CLR(fd, readfds);
+ }
+ else {
+ count += 1;
+ }
}
- usleep(100000);
}
+ if (count == 0) {
+ sleep(timeout->tv_sec);
+ }
+ return count;
}
-
-int MockTuner::StartSendingTS(const P2PVR::PacketIds &, const P2PVR::RawDataClientPrx & t)
-{
- return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, vid)}).first->first;
-}
-
-int MockTuner::StartSendingSection(Ice::Int sid, const P2PVR::RawDataClientPrx & t)
+void
+MockTuner::RequestPID(int pid, int fh) const
{
- switch (sid) {
+ logger->messagef(LOG::DEBUG, "%s: pid %x, fh %d", __PRETTY_FUNCTION__, pid, fh);
+ switch (pid) {
case 0: // pat
- return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, pat)}).first->first;
+ selectedPackets[fh] = DecompressAndRead(pat);
+ break;
+ case 0x10:
+ selectedPackets[fh] = DecompressAndRead(network);
+ break;
+ case 0x11:
+ selectedPackets[fh] = DecompressAndRead(services);
+ break;
+ case 0x12:
+ selectedPackets[fh] = DecompressAndRead(eventSet == 0 ? events1 : events2);
+ break;
case 100: // sample pmt
- return senders.insert({senderId++, new boost::thread(&MockTuner::SendLoop, this, t, pmt)}).first->first;
+ selectedPackets[fh] = DecompressAndRead(pmt);
+ break;
}
- throw std::runtime_error("I don't have a sample for that");
}
-void MockTuner::StopSending(int s)
+void
+MockTuner::RequestTS(const PacketIds & pids, int fh) const
{
- logger->messagebf(LOG::DEBUG, "%s: stop %d", __PRETTY_FUNCTION__, s);
- auto sitr = senders.find(s);
- if (sitr != senders.end()) {
- sitr->second->interrupt();
- sitr->second->join();
- senders.erase(sitr);
- }
+ logger->messagef(LOG::DEBUG, "%s: pids %zu, fh %d", __PRETTY_FUNCTION__, pids.size(), fh);
+ selectedPackets[fh] = DecompressAndRead(vid);
}
+
}
}
diff --git a/p2pvr/devices/mockTuner.h b/p2pvr/devices/mockTuner.h
index d0db11e..74578db 100644
--- a/p2pvr/devices/mockTuner.h
+++ b/p2pvr/devices/mockTuner.h
@@ -6,42 +6,34 @@
#include <Ice/BuiltinSequences.h>
#include <boost/thread.hpp>
#include <logger.h>
+#include "tuner.h"
namespace P2PVR {
namespace Testing {
-class DLL_PUBLIC MockTuner : public Tuner {
+class DLL_PUBLIC MockTuner : public DVB::TunerI {
public:
- MockTuner(Ice::CommunicatorPtr);
+ MockTuner(const boost::filesystem::path & deviceFrontend, Ice::CommunicatorPtr);
- void TuneTo(const DVBSI::DeliveryPtr &) override;
- int GetStatus() override;
- std::string GetDevice() override;
-
- void ScanAndSendNetworkInformation(const RawDataClientPrx & client) override;
- void SendNetworkInformation(const RawDataClientPrx & client) override;
- void SendBouquetAssociations(const RawDataClientPrx & client) override;
- void SendServiceDescriptions(const RawDataClientPrx & client) override;
- void SendProgramMap(Ice::Int pid, const RawDataClientPrx & client) override;
- void SendProgramAssociationTable(const RawDataClientPrx & client) override;
- void SendEventInformation(const RawDataClientPrx & client) override;
-
- int StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client) override;
- int StartSendingSection(Ice::Int pid, const RawDataClientPrx & client) override;
- void StopSending(int handle) override;
+ AdHoc::FileUtils::FileHandle OpenDemux() const override;
+ void RequestPID(int, int) const override;
+ void RequestTS(const PacketIds &, int fd) const override;
static void SetEventsSet(int n);
protected:
+ int iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const override;
+ ssize_t ioread(int fd, void *buf, size_t count) const override;
+ int ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const override;
+
static Ice::ByteSeq Decompress(const Ice::ByteSeq &);
static void LZMA_ASSERT(int ret_xz);
- void DecompressAndSendPackets(const Ice::ByteSeq &, const RawDataClientPrx &) const;
- void SendLoop(const RawDataClientPrx & t, const Ice::ByteSeq & dataxz) const;
+ std::list<Ice::ByteSeq> DecompressAndRead(const Ice::ByteSeq &) const;
- static int eventSet;
- std::map<int, boost::thread *> senders;
- static int senderId;
Ice::CommunicatorPtr ic;
+ // The open fh -> list of DVB packets
+ mutable std::map<int, std::list<Ice::ByteSeq>> selectedPackets;
+ static int eventSet;
static IceTray::Logging::LoggerPtr logger;
};
}
diff --git a/p2pvr/devices/tuner.cpp b/p2pvr/devices/tuner.cpp
index e7fd2eb..d3d0488 100644
--- a/p2pvr/devices/tuner.cpp
+++ b/p2pvr/devices/tuner.cpp
@@ -2,7 +2,6 @@
#include <fcntl.h>
#include <Ice/Ice.h>
#include <sys/ioctl.h>
-#include <poll.h>
#include <factory.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
@@ -26,11 +25,11 @@ TunerI::TunerI(const boost::filesystem::path & df, FrontendPtr pfe) :
frontend = pfe;
}
else {
- struct dvb_frontend_info fe_info;
- if (ioctl(frontendFD, FE_GET_INFO, &fe_info) < 0) {
- throw DeviceError(deviceFrontend.string(), strerror(errno), errno);
- }
- frontend = FrontendPtr(FrontendFactory::createNew(Frontend::FactoryKey(fe_info.type), this, fe_info));
+ struct dvb_frontend_info fe_info;
+ if (ioctl(frontendFD, FE_GET_INFO, &fe_info) < 0) {
+ throw DeviceError(deviceFrontend.string(), strerror(errno), errno);
+ }
+ frontend = FrontendPtr(FrontendFactory::createNew(Frontend::FactoryKey(fe_info.type), this, fe_info));
}
logger->messagebf(LOG::INFO, "%s: Attached to %s (%s, type %s)", __PRETTY_FUNCTION__,
deviceRoot, frontend->Info().name, frontend->Type());
@@ -132,7 +131,7 @@ TunerI::SendPID(int pid, const RawDataClientPrx & client) const
{
logger->messagebf(LOG::DEBUG, "%s: pid = 0x%x", __PRETTY_FUNCTION__, pid);
- AdHoc::FileUtils::FileHandle demux(OpenDemux());
+ auto demux = OpenDemux();
RequestPID(pid, demux);
return ReadDemuxAndSend(std::move(demux), client);
}
@@ -151,8 +150,26 @@ TunerI::RequestPID(int pid, int demux) const
}
}
+int
+TunerI::iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const
+{
+ return ::poll(fds, nfds, timeout);
+}
+
+ssize_t
+TunerI::ioread(int fd, void *buf, size_t count) const
+{
+ return ::read(fd, buf, count);
+}
+
+int
+TunerI::ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const
+{
+ return select(nfds, readfds, writefds, exceptfds, timeout);
+}
+
uint64_t
-TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataClientPrx & _client) const
+TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle demux, const RawDataClientPrx & _client) const
{
logger->messagebf(LOG::DEBUG, "%s: begin", __PRETTY_FUNCTION__);
struct pollfd ufd;
@@ -162,30 +179,36 @@ TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataCli
BackgroundClient client = BackgroundClient(new SendSi(std::move(demux), _client));
do {
// Wait for data to appear
- switch (poll(&ufd, 1, options->DemuxReadTimeout)) {
+ switch (iopoll(&ufd, 1, options->DemuxReadTimeout)) {
+ case -2:
+ //logger->messagef(LOG::DEBUG, "%s: All test data has been read... waiting...", __PRETTY_FUNCTION__);
+ break;
case -1:
- logger->messagebf(LOG::DEBUG, "%s: poll error reading demux (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ logger->messagef(LOG::ERR, "%s: poll error reading demux (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
throw DeviceError("demux", strerror(errno), errno);
case 0:
- auto status = frontend->GetStatus();
- logger->messagebf(LOG::DEBUG, "%s: Timed out waiting for data (device status 0x%02x)", __PRETTY_FUNCTION__, status);
+ logger->messagef(LOG::WARNING, "%s: Timed out waiting for data (device status 0x%02x)",
+ __PRETTY_FUNCTION__, frontend->GetStatus());
throw DeviceError("demux", "timeout", 0);
- }
+ default:
+ {
+ // Read it
+ Data buf(1 << 12);
+ int nr = ioread(ufd.fd, &buf.front(), buf.size());
+ if (nr < 0) {
+ logger->messagef(LOG::ERR, "%s: error reading demux (%d:%s) status 0x%02x",
+ __PRETTY_FUNCTION__, errno, strerror(errno), frontend->GetStatus());
+ throw DeviceError("demux", strerror(errno), errno);
+ }
+ size_t n = nr;
+ buf.resize(n);
- // Read it
- Data buf(1 << 12);
- int nr = read(demux, &buf.front(), buf.size());
- if (nr < 0) {
- logger->messagebf(LOG::DEBUG, "%s: error reading demux (%d:%s) status 0x%02x",
- __PRETTY_FUNCTION__, errno, strerror(errno), frontend->GetStatus());
- throw DeviceError("demux", strerror(errno), errno);
+ logger->messagef(LOG::DEBUG, "%s: sending", __PRETTY_FUNCTION__);
+ client->NewData(buf);
+ logger->messagef(LOG::DEBUG, "%s: sent", __PRETTY_FUNCTION__);
+ }
}
- size_t n = nr;
- buf.resize(n);
-
- client->NewData(buf);
-
- } while (!client->IsFinished());
+ } while (!client->IsFinished() && client->HasPending());
auto packetsSent = client->PacketsSent();
client.reset();
logger->messagebf(LOG::DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent);
@@ -195,10 +218,8 @@ TunerI::ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && demux, const RawDataCli
int
TunerI::StartSendingSection(int pid, const RawDataClientPrx & client)
{
- logger->message(LOG::DEBUG, __PRETTY_FUNCTION__);
-
Lock(lock);
- BackgroundClient bgc(new SendSi(OpenDemux(), client));
+ BackgroundClient bgc(new SendSiStream(OpenDemux(), client));
RequestPID(pid, bgc->fileHandle());
int demux = backgroundClients.insert({
bgc->fileHandle(), bgc }).first->first;
@@ -209,7 +230,6 @@ TunerI::StartSendingSection(int pid, const RawDataClientPrx & client)
int
TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client)
{
- logger->message(LOG::DEBUG, __PRETTY_FUNCTION__);
if (pids.empty()) {
throw DeviceError("demux", "Packet Id list cannot be empty", 0);
}
@@ -218,9 +238,19 @@ TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client)
BackgroundClient bgc(new SendTs(OpenDemux(), client));
int demux = bgc->fileHandle();
+ RequestTS(pids, demux);
+
+ backgroundClients.insert({ demux, bgc });
+ startSenderThread();
+ return demux;
+}
+
+void
+TunerI::RequestTS(const PacketIds & pids, int demux) const
+{
struct dmx_pes_filter_params pesFilterParams;
memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params));
- logger->messagebf(LOG::ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]);
+ logger->messagef(LOG::ERR, "%s: DMX_SET_PES_FILTER for pid %d", __PRETTY_FUNCTION__, pids[0]);
pesFilterParams.pid = pids[0];
pesFilterParams.input = DMX_IN_FRONTEND;
pesFilterParams.output = DMX_OUT_TSDEMUX_TAP;
@@ -228,50 +258,42 @@ TunerI::StartSendingTS(const PacketIds & pids, const RawDataClientPrx & client)
pesFilterParams.flags = 0;
if (ioctl(demux, DMX_SET_PES_FILTER, &pesFilterParams) < 0) {
- backgroundClients.erase(demux);
- logger->messagebf(LOG::ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ logger->messagef(LOG::ERR, "%s: DMX_SET_PES_FILTER failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
throw DeviceError("demux", strerror(errno), errno);
}
for (unsigned int x = 1; x < pids.size(); x += 1) {
__u16 p = pids[x];
- logger->messagebf(LOG::ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p);
+ logger->messagef(LOG::ERR, "%s: DMX_ADD_PID for pid %d", __PRETTY_FUNCTION__, p);
if (ioctl(demux, DMX_ADD_PID, &p) < 0) {
- backgroundClients.erase(demux);
- logger->messagebf(LOG::ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ logger->messagef(LOG::ERR, "%s: DMX_ADD_PID failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
throw DeviceError("demux", strerror(errno), errno);
}
}
setBufferSize(demux, options->DemuxStreamBufferSize);
if (ioctl(demux, DMX_START) < 0) {
- backgroundClients.erase(demux);
- logger->messagebf(LOG::ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ logger->messagef(LOG::ERR, "%s: DMX_START failed (%d: %s)", __PRETTY_FUNCTION__, errno, strerror(errno));
throw DeviceError("demux", strerror(errno), errno);
}
-
- backgroundClients.insert({ demux, bgc });
- startSenderThread();
- return demux;
}
void
TunerI::setBufferSize(int demux, unsigned long size)
{
if (ioctl(demux, DMX_SET_BUFFER_SIZE, size)) {
- logger->messagebf(LOG::ERR, "%s: DMX_SET_BUFFER_SIZE to %d failed (%d: %s)", __PRETTY_FUNCTION__, size, errno, strerror(errno));
+ logger->messagef(LOG::ERR, "%s: DMX_SET_BUFFER_SIZE to %lu failed (%d: %s)", __PRETTY_FUNCTION__, size, errno, strerror(errno));
throw DeviceError("demux", strerror(errno), errno);
}
- logger->messagebf(LOG::DEBUG, "%s: DMX_SET_BUFFER_SIZE to %d", __PRETTY_FUNCTION__, size);
+ logger->messagef(LOG::DEBUG, "%s: DMX_SET_BUFFER_SIZE to %lu", __PRETTY_FUNCTION__, size);
}
void
TunerI::StopSending(int handle)
{
- logger->message(LOG::DEBUG, __PRETTY_FUNCTION__);
+ logger->messagef(LOG::DEBUG, "%s: handle %d", __PRETTY_FUNCTION__, handle);
std::lock_guard<std::mutex> g(lock);
if (backgroundClients.find(handle) != backgroundClients.end()) {
- close(handle);
backgroundClients.erase(handle);
}
}
@@ -298,22 +320,22 @@ TunerI::senderThread()
lock.unlock();
struct timeval tv { 2, 0 };
- switch (select(n, &rfds, NULL, NULL, &tv)) {
+ int s = ioselect(n, &rfds, NULL, NULL, &tv);
+ lock.lock();
+ switch (s) {
case -1: // error
- logger->messagebf(LOG::DEBUG, "%s: select failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
+ logger->messagebf(LOG::WARNING, "%s: select failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
case 0: // nothing to read, but all is well
break;
default:
{ // stuff to do
- std::lock_guard<std::mutex> g(lock);
for (auto c = backgroundClients.begin(); c != backgroundClients.end(); ) {
if (FD_ISSET(c->first, &rfds)) {
// Read it
Data buf(1 << 16);
- int nr = read(c->first, &buf.front(), buf.size());
+ int nr = ioread(c->first, &buf.front(), buf.size());
if (nr < 0) {
- logger->messagebf(LOG::DEBUG, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
- close(c->first);
+ logger->messagef(LOG::ERR, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
c = backgroundClients.erase(c);
}
else {
@@ -331,10 +353,8 @@ TunerI::senderThread()
break;
}
// Clean up finished async requests
- lock.lock();
for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) {
if (client->second->IsFinished()) {
- close(client->first);
client = backgroundClients.erase(client);
}
else {
@@ -347,7 +367,7 @@ TunerI::senderThread()
lock.unlock();
}
-TunerI::IDataSender::IDataSender(AdHoc::FileUtils::FileHandle && h, const RawDataClientPrx & c) :
+TunerI::IDataSender::IDataSender(AdHoc::FileUtils::FileHandle h, const RawDataClientPrx & c) :
_packetsSent(0),
fh(std::move(h)),
client(c)
diff --git a/p2pvr/devices/tuner.h b/p2pvr/devices/tuner.h
index 8534a3c..983d3ff 100644
--- a/p2pvr/devices/tuner.h
+++ b/p2pvr/devices/tuner.h
@@ -4,6 +4,7 @@
#include <dvb.h>
#include <boost/filesystem/path.hpp>
#include <fileUtils.h>
+#include <poll.h>
#include "frontend.h"
#include <map>
#include <thread>
@@ -21,21 +22,22 @@ namespace Frontends {
class OFDM;
}
-class TunerI : public Tuner {
+class DLL_PUBLIC TunerI : public Tuner {
public:
class IDataSender {
public:
- IDataSender(AdHoc::FileUtils::FileHandle &&, const RawDataClientPrx &);
+ IDataSender(AdHoc::FileUtils::FileHandle, const RawDataClientPrx &);
virtual ~IDataSender() = 0;
virtual void NewData(const Data &) = 0;
virtual bool IsFinished() = 0;
+ virtual bool HasPending() = 0;
uint64_t PacketsSent() const;
int fileHandle() const;
protected:
uint64_t _packetsSent;
- AdHoc::FileUtils::FileHandle fh;
+ const AdHoc::FileUtils::FileHandle fh;
const RawDataClientPrx client;
};
typedef boost::shared_ptr<IDataSender> BackgroundClient;
@@ -60,11 +62,17 @@ class TunerI : public Tuner {
int StartSendingSection(Ice::Int pid, const RawDataClientPrx & client) override;
void StopSending(int handle) override;
+ protected:
+ virtual int iopoll(struct pollfd *fds, nfds_t nfds, int timeout) const;
+ virtual ssize_t ioread(int fd, void *buf, size_t count) const;
+ virtual int ioselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) const;
+
private:
- AdHoc::FileUtils::FileHandle OpenDemux() const;
+ virtual AdHoc::FileUtils::FileHandle OpenDemux() const;
uint64_t SendPID(int pid, const RawDataClientPrx & client) const;
- void RequestPID(int pid, int fd) const;
- uint64_t ReadDemuxAndSend(AdHoc::FileUtils::FileHandle && fd, const RawDataClientPrx & client) const;
+ virtual void RequestPID(int pid, int fd) const;
+ virtual void RequestTS(const PacketIds &, int fd) const;
+ uint64_t ReadDemuxAndSend(AdHoc::FileUtils::FileHandle fd, const RawDataClientPrx & client) const;
void startSenderThread();
void senderThread();
static void setBufferSize(int fd, unsigned long bytes);
diff --git a/p2pvr/devices/tunerSendSi.cpp b/p2pvr/devices/tunerSendSi.cpp
index bc901f7..822ec14 100644
--- a/p2pvr/devices/tunerSendSi.cpp
+++ b/p2pvr/devices/tunerSendSi.cpp
@@ -7,7 +7,7 @@ namespace P2PVR {
namespace DVB {
IceTray::Logging::LoggerPtr SendSi::logger(LOGMANAGER()->getLogger<SendSi>());
-SendSi::SendSi(AdHoc::FileUtils::FileHandle && fh, const RawDataClientPrx & c) :
+SendSi::SendSi(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) :
TunerI::IDataSender(std::move(fh), c->ice_collocationOptimized(false))
{
}
@@ -16,6 +16,15 @@ SendSi::~SendSi()
{
}
+SendSiStream::SendSiStream(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) :
+ SendSi(std::move(fh), c)
+{
+}
+
+SendSiStream::~SendSiStream()
+{
+}
+
void
SendSi::NewData(const Data & buf)
{
@@ -29,19 +38,26 @@ SendSi::NewData(const Data & buf)
bool
SendSi::IsFinished()
{
- try {
- for (auto c = asyncs.begin(); c != asyncs.end(); ) {
- if ((*c)->isCompleted()) {
- if (client->end_NewData(*c)) {
- return true;
- }
- c = asyncs.erase(c);
- }
- else {
- c++;
+ for (auto c = asyncs.begin(); c != asyncs.end(); ) {
+ if ((*c)->isCompleted()) {
+ auto a = *c;
+ c = asyncs.erase(c);
+ if (client->end_NewData(a)) {
+ return true;
}
}
- return false;
+ else {
+ c++;
+ }
+ }
+ return false;
+}
+
+bool
+SendSiStream::IsFinished()
+{
+ try {
+ return SendSi::IsFinished();
}
catch (const std::exception & ex) {
logger->messagebf(LOG::DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
@@ -50,6 +66,12 @@ SendSi::IsFinished()
}
bool
+SendSi::HasPending()
+{
+ return !asyncs.empty();
+}
+
+bool
SendSi::IsValidSection(const Data & buf)
{
auto n = buf.size();
diff --git a/p2pvr/devices/tunerSendSi.h b/p2pvr/devices/tunerSendSi.h
index e590e29..e6517c6 100644
--- a/p2pvr/devices/tunerSendSi.h
+++ b/p2pvr/devices/tunerSendSi.h
@@ -8,13 +8,14 @@ namespace P2PVR {
namespace DVB {
class SendSi : public TunerI::IDataSender {
public:
- SendSi(AdHoc::FileUtils::FileHandle &&, const P2PVR::RawDataClientPrx &);
+ SendSi(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &);
~SendSi();
- void NewData(const P2PVR::Data &);
- bool IsFinished();
+ void NewData(const P2PVR::Data &) override;
+ bool IsFinished() override;
+ bool HasPending() override;
- private:
+ protected:
static bool crc32(const P2PVR::Data &);
static bool IsValidSection(const P2PVR::Data &);
@@ -22,6 +23,13 @@ class SendSi : public TunerI::IDataSender {
bool finish;
static IceTray::Logging::LoggerPtr logger;
};
+class SendSiStream : public SendSi {
+ public:
+ SendSiStream(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &);
+ ~SendSiStream();
+
+ bool IsFinished() override;
+};
}
}
diff --git a/p2pvr/devices/tunerSendTs.cpp b/p2pvr/devices/tunerSendTs.cpp
index 3f8864c..23bc3b4 100644
--- a/p2pvr/devices/tunerSendTs.cpp
+++ b/p2pvr/devices/tunerSendTs.cpp
@@ -10,7 +10,7 @@ namespace DVB {
IceTray::Logging::LoggerPtr SendTs::logger(LOGMANAGER()->getLogger<SendTs>());
-SendTs::SendTs(AdHoc::FileUtils::FileHandle && fh, const RawDataClientPrx & c) :
+SendTs::SendTs(AdHoc::FileUtils::FileHandle fh, const RawDataClientPrx & c) :
TunerI::IDataSender(std::move(fh), c->ice_collocationOptimized(false))
{
buffer.reserve(TARGET_BUFFER_SIZE);
@@ -76,6 +76,12 @@ SendTs::IsFinished()
return true;
}
}
+
+bool
+SendTs::HasPending()
+{
+ return async;
+}
}
}
diff --git a/p2pvr/devices/tunerSendTs.h b/p2pvr/devices/tunerSendTs.h
index abdd4b5..5390882 100644
--- a/p2pvr/devices/tunerSendTs.h
+++ b/p2pvr/devices/tunerSendTs.h
@@ -8,11 +8,12 @@ namespace P2PVR {
namespace DVB {
class SendTs : public TunerI::IDataSender {
public:
- SendTs(AdHoc::FileUtils::FileHandle &&, const P2PVR::RawDataClientPrx &);
+ SendTs(AdHoc::FileUtils::FileHandle, const P2PVR::RawDataClientPrx &);
~SendTs();
void NewData(const P2PVR::Data &);
bool IsFinished();
+ bool HasPending();
private:
void sendBufferChunk();
diff --git a/p2pvr/dvb/Jamfile.jam b/p2pvr/dvb/Jamfile.jam
index e9a31f4..b3176bb 100644
--- a/p2pvr/dvb/Jamfile.jam
+++ b/p2pvr/dvb/Jamfile.jam
@@ -1,8 +1,12 @@
+lib boost_system ;
+
lib p2pvrdvb :
[ glob-tree *.cpp : unittests ]
:
<library>../ice//p2pvrice
<library>..//adhocutil
+ <library>..//icetray
+ <library>boost_system
<library>../..//glibmm
<implicit-dependency>../ice//p2pvrice
: :