From a093bce049df33837892aef5a11ce573a53259b8 Mon Sep 17 00:00:00 2001 From: randomdan Date: Sun, 8 Dec 2013 00:39:03 +0000 Subject: Add some SI functions for getting service stuff Add a templated SI parser to bounce table objects to a function Allow SI parsers to say they want the next instance of an object too Add ServiceStreamer to write a/v streams of a TS to a raw data client --- p2pvr/daemon/daemon.cpp | 2 - p2pvr/ice/p2pvr.ice | 3 + p2pvr/lib/Jamfile.jam | 1 + p2pvr/lib/bindSiParserHandler.h | 25 +++++++++ p2pvr/lib/fileSink.cpp | 21 +++++++ p2pvr/lib/fileSink.h | 20 +++++++ p2pvr/lib/maintenance/network.cpp | 3 +- p2pvr/lib/maintenance/programAssociations.cpp | 3 +- p2pvr/lib/maintenance/programMap.cpp | 3 +- p2pvr/lib/maintenance/services.cpp | 3 +- p2pvr/lib/serviceStreamer.cpp | 79 +++++++++++++++++++++++++++ p2pvr/lib/serviceStreamer.h | 42 ++++++++++++++ p2pvr/lib/si.cpp | 68 +++++++++++++++++++---- p2pvr/lib/si.h | 13 ++++- p2pvr/lib/siParsers/event.cpp | 3 +- p2pvr/lib/siParsers/event.h | 2 +- p2pvr/lib/siParsers/table.h | 15 ++++- 17 files changed, 282 insertions(+), 24 deletions(-) create mode 100644 p2pvr/lib/bindSiParserHandler.h create mode 100644 p2pvr/lib/fileSink.cpp create mode 100644 p2pvr/lib/fileSink.h create mode 100644 p2pvr/lib/serviceStreamer.cpp create mode 100644 p2pvr/lib/serviceStreamer.h diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index 91b8daa..1bd328c 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -35,8 +35,6 @@ class P2PvrDaemon : public Daemon { auto maint = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(ic->stringToIdentity("Maintenance"))); maint->UpdateNetwork(FE_OFDM); maint->UpdateServices(FE_OFDM); - maint->UpdateProgramAssociations(FE_OFDM); - maint->UpdateProgramMaps(FE_OFDM); maint->UpdateEvents(FE_OFDM); //ic->waitForShutdown(); diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index b5903d3..8835511 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -265,8 +265,11 @@ module P2PVR { interface SI { // Get any delivery suitable for SI reading idempotent Deliveries GetAllDeliveries(short type); + idempotent DVBSI::Delivery GetDeliveryForService(int id); + idempotent DVBSI::Delivery GetDeliveryForTransport(int id); // Get a list of services idempotent DVBSI::ServiceList GetServices(); + idempotent DVBSI::Service GetService(int id); }; }; diff --git a/p2pvr/lib/Jamfile.jam b/p2pvr/lib/Jamfile.jam index b05e572..9814478 100644 --- a/p2pvr/lib/Jamfile.jam +++ b/p2pvr/lib/Jamfile.jam @@ -14,5 +14,6 @@ lib p2pvrlib : : : boost_filesystem ../ice//p2pvrice + boost_system . ; diff --git a/p2pvr/lib/bindSiParserHandler.h b/p2pvr/lib/bindSiParserHandler.h new file mode 100644 index 0000000..a4813c2 --- /dev/null +++ b/p2pvr/lib/bindSiParserHandler.h @@ -0,0 +1,25 @@ +#ifndef BINDSIPARSERHANDLER_H +#define BINDSIPARSERHANDLER_H + +#include + +template +class BindSiParserHandler : public Base { + public: + typedef boost::function Callback; + BindSiParserHandler(const Callback & cb) : + callBack(cb) + { + } + + bool HandleTable(SIObject siObject) + { + return callBack(siObject); + } + + private: + const Callback callBack; +}; + +#endif + diff --git a/p2pvr/lib/fileSink.cpp b/p2pvr/lib/fileSink.cpp new file mode 100644 index 0000000..2fd4a58 --- /dev/null +++ b/p2pvr/lib/fileSink.cpp @@ -0,0 +1,21 @@ +#include "fileSink.h" + +FileSink::FileSink(const boost::filesystem::path & path) : + file(fopen(path.string().c_str(), "w")) +{ +} + +FileSink::~FileSink() +{ + if (file) { + fclose(file); + } +} + +bool +FileSink::NewData(const P2PVR::Data & data, const Ice::Current &) +{ + fwrite(&data.front(), data.size(), 1, file); + return false; +} + diff --git a/p2pvr/lib/fileSink.h b/p2pvr/lib/fileSink.h new file mode 100644 index 0000000..a89016c --- /dev/null +++ b/p2pvr/lib/fileSink.h @@ -0,0 +1,20 @@ +#ifndef FILESINK_H +#define FILESINK_H + +#include +#include +#include + +class FileSink : public P2PVR::RawDataClient { + public: + FileSink(const boost::filesystem::path & path); + ~FileSink(); + + bool NewData(const P2PVR::Data & data, const Ice::Current &); + + private: + FILE * const file; +}; + +#endif + diff --git a/p2pvr/lib/maintenance/network.cpp b/p2pvr/lib/maintenance/network.cpp index 1cd224a..f49e3f2 100644 --- a/p2pvr/lib/maintenance/network.cpp +++ b/p2pvr/lib/maintenance/network.cpp @@ -12,7 +12,7 @@ class SiNetworkInformationMerger : public SiNetworkInformationParser { public: SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { } - void HandleTable(DVBSI::NetworkPtr n) + bool HandleTable(DVBSI::NetworkPtr n) { Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name); BOOST_FOREACH(const auto & ts, n->TransportStreams) { @@ -55,6 +55,7 @@ class SiNetworkInformationMerger : public SiNetworkInformationParser { } mergeServices.loadComplete(commonObjects); mergeServices.execute(NULL); + return false; } private: CommonObjects * commonObjects; diff --git a/p2pvr/lib/maintenance/programAssociations.cpp b/p2pvr/lib/maintenance/programAssociations.cpp index 4d32a86..47e48b3 100644 --- a/p2pvr/lib/maintenance/programAssociations.cpp +++ b/p2pvr/lib/maintenance/programAssociations.cpp @@ -9,7 +9,7 @@ class SiProgramAssociationHandler : public SiProgramAssociationParser { public: - void HandleTable(ProgramAssociationMapPtr pam) + bool HandleTable(ProgramAssociationMapPtr pam) { Logger()->messagebf(LOG_DEBUG, "Program association table"); BOOST_FOREACH(const auto & pa, *pam) { @@ -18,6 +18,7 @@ class SiProgramAssociationHandler : public SiProgramAssociationParser { BOOST_FOREACH(const auto & pa, *pam) { map[pa.first] = pa.second; } + return false; } ProgramAssociationMap map; diff --git a/p2pvr/lib/maintenance/programMap.cpp b/p2pvr/lib/maintenance/programMap.cpp index ecbd126..bd4c2ee 100644 --- a/p2pvr/lib/maintenance/programMap.cpp +++ b/p2pvr/lib/maintenance/programMap.cpp @@ -17,7 +17,7 @@ class SiProgramMapHandler : public SiProgramMapParser { SiProgramMapHandler(const RowProcessorCallback & cb) : callBack(cb) {} - void HandleTable(DVBSI::ProgramMapPtr pmp) + bool HandleTable(DVBSI::ProgramMapPtr pmp) { Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); BOOST_FOREACH(const auto & s, pmp->Streams) { @@ -27,6 +27,7 @@ class SiProgramMapHandler : public SiProgramMapParser { BindColumns(rowState, s); rowState.process(callBack); } + return false; } private: diff --git a/p2pvr/lib/maintenance/services.cpp b/p2pvr/lib/maintenance/services.cpp index bd4d9a2..08b8c9d 100644 --- a/p2pvr/lib/maintenance/services.cpp +++ b/p2pvr/lib/maintenance/services.cpp @@ -12,7 +12,7 @@ class SiServicesMerger : public SiServicesParser { public: SiServicesMerger(CommonObjects * co) : commonObjects(co) { } - void HandleTable(DVBSI::TransportStreamPtr ts) + bool HandleTable(DVBSI::TransportStreamPtr ts) { Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId); BOOST_FOREACH(const auto & s, ts->Services) { @@ -30,6 +30,7 @@ class SiServicesMerger : public SiServicesParser { mergeServices.sources.insert(new ContainerIterator(&ts->Services)); mergeServices.loadComplete(commonObjects); mergeServices.execute(NULL); + return false; } private: diff --git a/p2pvr/lib/serviceStreamer.cpp b/p2pvr/lib/serviceStreamer.cpp new file mode 100644 index 0000000..ac034ab --- /dev/null +++ b/p2pvr/lib/serviceStreamer.cpp @@ -0,0 +1,79 @@ +#include "serviceStreamer.h" +#include +#include "fileSink.h" +#include "bindSiParserHandler.h" + +ServiceStreamer::ServiceStreamer(int sid, const Ice::CommunicatorPtr & ic, const Ice::ObjectAdapterPtr & a) : + adapter(a), + devs(P2PVR::DevicesPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("GlobalDevices")))), + si(P2PVR::SIPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("SI")))), + target(adapter, new FileSink("/tmp/out.ts")), + patParser(adapter, new BindSiParserHandler(boost::bind(&ServiceStreamer::HandlePAT, this, _1))), + pmtParser(adapter, new BindSiParserHandler(boost::bind(&ServiceStreamer::HandlePMT, this, _1))), + serviceId(sid), + patHandle(0), pmtStream(0), pmtHandle(0), serviceHandle(0) +{ +} + +ServiceStreamer::~ServiceStreamer() +{ +} + +bool +ServiceStreamer::HandlePAT(ProgramAssociationMapPtr pam) +{ + const auto p = pam->find(serviceId); + if (p != pam->end() && p->second != pmtStream) { + pmtStream = p->second; + Logger()->messagef(LOG_DEBUG, "%s: Got ProgramAssociationMap, pmtStream now = %d", __PRETTY_FUNCTION__, pmtStream); + stopHandle(pmtHandle); + pmtHandle = tuner->StartSendingSection(pmtStream, pmtParser); + } + return true; +} + +bool +ServiceStreamer::HandlePMT(DVBSI::ProgramMapPtr pmp) +{ + Streams strms; + BOOST_FOREACH(const auto & s, pmp->Streams) { + if (s->Type >= 2 && s->Type <= 5) { + strms.insert(s->Id); + } + } + if (strms != streams) { + streams = strms; + Logger()->messagebf(LOG_DEBUG, "%s: Got ProgramMap, switching to %d streams", __PRETTY_FUNCTION__, streams.size()); + stopHandle(serviceHandle); + serviceHandle = tuner->StartSendingTS(P2PVR::PacketIds(streams.begin(), streams.end()), target); + } + return true; +} + +void +ServiceStreamer::Start() +{ + const auto transport = si->GetDeliveryForService(serviceId); + tuner = devs->GetTunerSpecific(transport, time(NULL) + 300); + patHandle = tuner->StartSendingSection(0, patParser); +} + +void +ServiceStreamer::Stop() +{ + stopHandle(serviceHandle); + stopHandle(patHandle); + stopHandle(pmtHandle); + devs->ReleaseTuner(tuner); + tuner = NULL; +} + +void +ServiceStreamer::stopHandle(int & handle) +{ + if (handle) { + tuner->StopSending(handle); + handle = 0; + } +} + diff --git a/p2pvr/lib/serviceStreamer.h b/p2pvr/lib/serviceStreamer.h new file mode 100644 index 0000000..3ea0c5c --- /dev/null +++ b/p2pvr/lib/serviceStreamer.h @@ -0,0 +1,42 @@ +#ifndef SERVICESTREAMER_H +#define SERVICESTREAMER_H + +#include +#include +#include "siParsers/programAssociation.h" +#include "siParsers/programMap.h" +#include "temporaryIceAdapterObject.h" +#include +#include + +class ServiceStreamer { + public: + ServiceStreamer(int sid, const Ice::CommunicatorPtr & ic, const Ice::ObjectAdapterPtr & a); + ~ServiceStreamer(); + + bool HandlePAT(ProgramAssociationMapPtr pam); + bool HandlePMT(DVBSI::ProgramMapPtr pmp); + void Start(); + void Stop(); + private: + void stopHandle(int & handle); + + const Ice::ObjectAdapterPtr & adapter; + P2PVR::DevicesPrx devs; + P2PVR::SIPrx si; + P2PVR::TunerPrx tuner; + TemporarayIceAdapterObject target; + TemporarayIceAdapterObject patParser; + TemporarayIceAdapterObject pmtParser; + + int serviceId; + int patHandle; + int pmtStream; + int pmtHandle; + typedef std::set Streams; + Streams streams; + int serviceHandle; +}; + +#endif + diff --git a/p2pvr/lib/si.cpp b/p2pvr/lib/si.cpp index 032220e..98b2316 100644 --- a/p2pvr/lib/si.cpp +++ b/p2pvr/lib/si.cpp @@ -3,15 +3,29 @@ #include "sqlContainerCreator.h" #include #include -#include #include +#include -typedef boost::shared_ptr SelectPtr; +SI::SelectPtr +SI::Select(const std::string & sql) const +{ + auto db = dataSource("postgres"); + return SelectPtr(db->getReadonly().newSelectCommand(sql)); +} +SI::SelectPtr +SI::Select(const std::string & sql, const std::list & vs) const +{ + SelectPtr sel(Select(sql)); + unsigned int offset = 0; + BOOST_FOREACH(const auto & v, vs) { + boost::apply_visitor(SqlVariableBinder(sel.get(), offset++), v); + } + return sel; +} P2PVR::Deliveries SI::GetAllDeliveries(short type, const Ice::Current &) { - auto db = dataSource("postgres"); Logger()->messagebf(LOG_DEBUG, "%s(type %d)", __PRETTY_FUNCTION__, type); P2PVR::Deliveries rtn; SelectPtr sel; @@ -19,22 +33,19 @@ SI::GetAllDeliveries(short type, const Ice::Current &) case FE_OFDM: { SqlContainerCreator cc(rtn); - sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbt")); - cc.populate(sel); + cc.populate(Select("SELECT * FROM delivery_dvbt")); break; } case FE_QAM: { SqlContainerCreator cc(rtn); - sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbc")); - cc.populate(sel); + cc.populate(Select("SELECT * FROM delivery_dvbc")); break; } case FE_QPSK: { SqlContainerCreator cc(rtn); - sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbs")); - cc.populate(sel); + cc.populate(Select("SELECT * FROM delivery_dvbs")); break; } } @@ -42,12 +53,47 @@ SI::GetAllDeliveries(short type, const Ice::Current &) return rtn; } +DVBSI::DeliveryPtr +SI::GetDeliveryForTransport(int id, const Ice::Current&) +{ + P2PVR::Deliveries rtn; + SqlContainerCreator cct(rtn); + cct.populate(Select("SELECT * FROM delivery_dvbt WHERE transportStreamId = ?", {id})); + SqlContainerCreator ccc(rtn); + ccc.populate(Select("SELECT * FROM delivery_dvbc WHERE transportStreamId = ?", {id})); + SqlContainerCreator ccs(rtn); + ccs.populate(Select("SELECT * FROM delivery_dvbs WHERE transportStreamId = ?", {id})); + return rtn.front(); +} + +DVBSI::DeliveryPtr +SI::GetDeliveryForService(int id, const Ice::Current&) +{ + P2PVR::Deliveries rtn; + SqlContainerCreator cct(rtn); + cct.populate(Select("SELECT d.* FROM services s, delivery_dvbt d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id})); + SqlContainerCreator ccc(rtn); + ccc.populate(Select("SELECT d.* FROM services s, delivery_dvbc d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id})); + SqlContainerCreator ccs(rtn); + ccs.populate(Select("SELECT d.* FROM services s, delivery_dvbs d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id})); + return rtn.front(); +} + DVBSI::ServiceList SI::GetServices(const Ice::Current&) { - auto db = dataSource("postgres"); DVBSI::ServiceList rtn; SqlContainerCreator cc(rtn); - cc.populate(SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM services ORDER BY serviceId"))); + cc.populate(Select("SELECT * FROM services ORDER BY serviceId")); return rtn; } + +DVBSI::ServicePtr +SI::GetService(int id, const Ice::Current&) +{ + DVBSI::ServiceList rtn; + SqlContainerCreator cc(rtn); + cc.populate(Select("SELECT * FROM services WHERE serviceId = ?", {id})); + return rtn.front(); +} + diff --git a/p2pvr/lib/si.h b/p2pvr/lib/si.h index 5530125..508c0fe 100644 --- a/p2pvr/lib/si.h +++ b/p2pvr/lib/si.h @@ -3,11 +3,20 @@ #include #include +#include class SI : public P2PVR::SI, public virtual CommonObjects { public: - P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current&); - DVBSI::ServiceList GetServices(const Ice::Current&); + typedef boost::shared_ptr SelectPtr; + + P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current &); + DVBSI::DeliveryPtr GetDeliveryForService(int id, const Ice::Current &); + DVBSI::DeliveryPtr GetDeliveryForTransport(int id, const Ice::Current &); + DVBSI::ServiceList GetServices(const Ice::Current &); + DVBSI::ServicePtr GetService(int id, const Ice::Current &); + protected: + SelectPtr Select(const std::string &) const; + SelectPtr Select(const std::string &, const std::list &) const; }; #endif diff --git a/p2pvr/lib/siParsers/event.cpp b/p2pvr/lib/siParsers/event.cpp index 6e5446d..dbb9f90 100644 --- a/p2pvr/lib/siParsers/event.cpp +++ b/p2pvr/lib/siParsers/event.cpp @@ -205,9 +205,10 @@ SiEpgParser::CheckTableId(u_char tableId) const return ((tableId >= 0x50 && tableId <= 0x5f) || (tableId >= 0x60 && tableId <= 0x6f)); } -void +bool SiEpgParser::HandleTable(DVBSI::EitInformationPtr) { + return false; } Common::DateTime & diff --git a/p2pvr/lib/siParsers/event.h b/p2pvr/lib/siParsers/event.h index 0528d98..943f5aa 100644 --- a/p2pvr/lib/siParsers/event.h +++ b/p2pvr/lib/siParsers/event.h @@ -20,7 +20,7 @@ class SiEpgParser : public SiTableParserheader.tableid >= 0x60 ? 0x60 : 0x50); } uint8_t LastTableId(const EventInformation * ei) { return ei->LastTableId; } void ParseSiTable(const EventInformation * eit, DVBSI::EitInformationPtr); - void HandleTable(DVBSI::EitInformationPtr); + bool HandleTable(DVBSI::EitInformationPtr); virtual void HandleTable(DVBSI::EventPtr) = 0; private: diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h index effbe6e..4152b5e 100644 --- a/p2pvr/lib/siParsers/table.h +++ b/p2pvr/lib/siParsers/table.h @@ -121,14 +121,23 @@ class SiTableParser : public SiTableParserBase { } } if (complete) { - HandleTable(obj); - obj = TargetType(); + if (HandleTable(obj)) { + targetTableSections.clear(); + } + else { + obj = TargetType(); + } incomplete -= 1; } } } data += HILO(siTable->header.section_length) + 4; } + return IsFinished(); + } + + virtual bool IsFinished() const + { return ((incomplete == 0) && (startTime < (time(NULL) - 10))); } @@ -186,7 +195,7 @@ class SiTableParser : public SiTableParserBase { virtual bool CheckTableId(u_char tableId) const = 0; virtual void ParseSiTable(const TableType *, TargetType) = 0; - virtual void HandleTable(TargetType table) = 0; + virtual bool HandleTable(TargetType table) = 0; private: mutable Contents contents; -- cgit v1.2.3