diff options
author | randomdan <randomdan@localhost> | 2013-12-03 03:40:35 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2013-12-03 03:40:35 +0000 |
commit | 41c1a2777f87c25a092cd122f1d9c422d71d9ff4 (patch) | |
tree | a42440dbb9605ac76c2b9593a95bea47f114444a | |
parent | Remove tuner from adapter only when there are no more users (diff) | |
download | p2pvr-41c1a2777f87c25a092cd122f1d9c422d71d9ff4.tar.bz2 p2pvr-41c1a2777f87c25a092cd122f1d9c422d71d9ff4.tar.xz p2pvr-41c1a2777f87c25a092cd122f1d9c422d71d9ff4.zip |
Changes to add downloading program association table and program map (big mess in maintenance.cpp now)
Supporting changes in slice, general libs etc
Wait for lock after tuning, throw on fail
Don't register tuner as in use until tuning has succeeded
Tweaks in table parser to be more helpful
Count packets transmitted in a demux send
-rw-r--r-- | p2pvr/.p2config | 1 | ||||
-rw-r--r-- | p2pvr/daemon/daemon.cpp | 2 | ||||
-rw-r--r-- | p2pvr/datasources/schema.sql | 32 | ||||
-rw-r--r-- | p2pvr/ice/p2pvr.ice | 15 | ||||
-rw-r--r-- | p2pvr/lib/dvbsiHelpers.h | 1 | ||||
-rw-r--r-- | p2pvr/lib/dvbsiHelpers/programMap.cpp | 23 | ||||
-rw-r--r-- | p2pvr/lib/frontends/ofdm.cpp | 47 | ||||
-rw-r--r-- | p2pvr/lib/localDevices.cpp | 10 | ||||
-rw-r--r-- | p2pvr/lib/maintenance.cpp | 192 | ||||
-rw-r--r-- | p2pvr/lib/maintenance.h | 2 | ||||
-rw-r--r-- | p2pvr/lib/mapIterator.cpp | 10 | ||||
-rw-r--r-- | p2pvr/lib/mapIterator.h | 45 | ||||
-rw-r--r-- | p2pvr/lib/objectRowState.h | 9 | ||||
-rw-r--r-- | p2pvr/lib/siParsers/programAssociation.cpp | 29 | ||||
-rw-r--r-- | p2pvr/lib/siParsers/programAssociation.h | 23 | ||||
-rw-r--r-- | p2pvr/lib/siParsers/programMap.cpp | 46 | ||||
-rw-r--r-- | p2pvr/lib/siParsers/programMap.h | 37 | ||||
-rw-r--r-- | p2pvr/lib/siParsers/table.h | 15 | ||||
-rw-r--r-- | p2pvr/lib/tuner.cpp | 26 | ||||
-rw-r--r-- | p2pvr/lib/tuner.h | 6 |
20 files changed, 535 insertions, 36 deletions
diff --git a/p2pvr/.p2config b/p2pvr/.p2config index 8c4269f..5c820ab 100644 --- a/p2pvr/.p2config +++ b/p2pvr/.p2config @@ -2,5 +2,6 @@ library = libp2pvrdaemon.so daemon.type = p2pvrdaemon common.filelog.level = 9 common.filelog.path = /tmp/p2daemon.log +common.filelog.openmode = w common.consolelogLevel = 9 p2pvr.globaldevices.carddaemon = Devices:default -h defiant -p 10001 diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index de470d3..f3fe1ad 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -34,6 +34,8 @@ class P2PvrDaemon : public Daemon { auto maint = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(ic->stringToIdentity("Maintenance"))); maint->UpdateTransports(FE_OFDM); maint->UpdateServices(FE_OFDM); + maint->UpdateProgramAssociations(FE_OFDM); + maint->UpdateProgramMaps(FE_OFDM); maint->UpdateEvents(FE_OFDM); //ic->waitForShutdown(); diff --git a/p2pvr/datasources/schema.sql b/p2pvr/datasources/schema.sql index c628bf0..c51eb06 100644 --- a/p2pvr/datasources/schema.sql +++ b/p2pvr/datasources/schema.sql @@ -196,13 +196,27 @@ CREATE TABLE services ( eitschedule boolean, eitpresentfollowing boolean, freecamode boolean, - transportstreamid integer NOT NULL + transportstreamid integer NOT NULL, + programid integer ); ALTER TABLE public.services OWNER TO gentoo; -- +-- Name: servicestreams; Type: TABLE; Schema: public; Owner: gentoo; Tablespace: +-- + +CREATE TABLE servicestreams ( + serviceid integer NOT NULL, + id integer NOT NULL, + type smallint NOT NULL +); + + +ALTER TABLE public.servicestreams OWNER TO gentoo; + +-- -- Name: transportstreams; Type: TABLE; Schema: public; Owner: gentoo; Tablespace: -- @@ -287,6 +301,14 @@ ALTER TABLE ONLY services -- +-- Name: pk_servicestreams; Type: CONSTRAINT; Schema: public; Owner: gentoo; Tablespace: +-- + +ALTER TABLE ONLY servicestreams + ADD CONSTRAINT pk_servicestreams PRIMARY KEY (serviceid, id); + + +-- -- Name: pk_transportstreams; Type: CONSTRAINT; Schema: public; Owner: gentoo; Tablespace: -- @@ -359,6 +381,14 @@ ALTER TABLE ONLY services -- +-- Name: fk_servicestreams_service; Type: FK CONSTRAINT; Schema: public; Owner: gentoo +-- + +ALTER TABLE ONLY servicestreams + ADD CONSTRAINT fk_servicestreams_service FOREIGN KEY (serviceid) REFERENCES services(serviceid) ON UPDATE CASCADE ON DELETE CASCADE; + + +-- -- Name: fk_transportstreams_network; Type: FK CONSTRAINT; Schema: public; Owner: gentoo -- diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index 62c58ca..15013de 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -109,6 +109,17 @@ module DVBSI { ServiceList Services; }; + class Stream { + short Type; + int Id; + }; + sequence<Stream> StreamList; + + class ProgramMap { + int ServiceId; + StreamList Streams; + }; + class EitInformation { int ServiceId; int TransportStreamId; @@ -193,6 +204,8 @@ module P2PVR { idempotent void SendNetworkInformation(RawDataClient * client); idempotent void SendBouquetAssociations(RawDataClient * client); idempotent void SendServiceDescriptions(RawDataClient * client); + idempotent void SendProgramAssociationTable(RawDataClient * client); + idempotent void SendProgramMap(int pid, RawDataClient * client); idempotent void SendEventInformation(RawDataClient * client); int StartSendingTS(PacketIds pids, RawDataClient * client); @@ -224,6 +237,8 @@ module P2PVR { idempotent void UpdateAll(); idempotent void UpdateTransports(short type); idempotent void UpdateServices(short type); + idempotent void UpdateProgramAssociations(short type); + idempotent void UpdateProgramMaps(short type); idempotent void UpdateEvents(short type); }; diff --git a/p2pvr/lib/dvbsiHelpers.h b/p2pvr/lib/dvbsiHelpers.h index bf0de82..132c1b5 100644 --- a/p2pvr/lib/dvbsiHelpers.h +++ b/p2pvr/lib/dvbsiHelpers.h @@ -21,6 +21,7 @@ ColumnHelperParent(DVBSI::TerrestrialDeliveryPtr, DVBSI::TransportStreamPtr); ColumnHelperParent(DVBSI::CableDeliveryPtr, DVBSI::TransportStreamPtr); ColumnHelperParent(DVBSI::SatelliteDeliveryPtr, DVBSI::TransportStreamPtr); ColumnHelperParent(DVBSI::ServicePtr, DVBSI::TransportStreamPtr); +ColumnHelperParent(DVBSI::StreamPtr, DVBSI::ProgramMapPtr); ColumnHelper(DVBSI::EventPtr); #endif diff --git a/p2pvr/lib/dvbsiHelpers/programMap.cpp b/p2pvr/lib/dvbsiHelpers/programMap.cpp new file mode 100644 index 0000000..5b543a9 --- /dev/null +++ b/p2pvr/lib/dvbsiHelpers/programMap.cpp @@ -0,0 +1,23 @@ +#include "../dvbsiHelpers.h" +#include "../p2Helpers.h" + +template<> +void +CreateColumns<DVBSI::StreamPtr>(const ColumnCreator & cc) +{ + cc("serviceId", true); + cc("id", true); + cc("type", false); +} + +template<> +void +BindColumns(RowState & rs, const DVBSI::StreamPtr & stream, const DVBSI::ProgramMapPtr & pmp) +{ + rs.fields[0] << pmp->ServiceId; + rs.fields[1] << stream->Id; + rs.fields[2] << stream->Type; +} + + + diff --git a/p2pvr/lib/frontends/ofdm.cpp b/p2pvr/lib/frontends/ofdm.cpp index 8e2b23c..fca94ae 100644 --- a/p2pvr/lib/frontends/ofdm.cpp +++ b/p2pvr/lib/frontends/ofdm.cpp @@ -29,6 +29,7 @@ class Frontend_OFDM : public Frontend { feparams.u.ofdm.guard_interval = (fe_guard_interval_t)td->GuardInterval; feparams.u.ofdm.hierarchy_information = (fe_hierarchy_t)td->Hierarchy; SetParameters(feparams); + WaitForLock(); } dvb_frontend_parameters GetParameters() const @@ -42,6 +43,28 @@ class Frontend_OFDM : public Frontend { return feparams; } + void WaitForLock() const + { + fe_status status; + // Wait for something (500ms) + for (int x = 0; x < 50 && (status = GetStatus()) == 0; x += 1) { + usleep(10000); + } + // Was it useful? + if (!(status & (FE_HAS_SIGNAL | FE_HAS_CARRIER))) { + Logger()->messagebf(LOG_ERR, "Tuning of device %s failed (%s)", tuner->Device(), "No carrier"); + throw P2PVR::DeviceError(tuner->Device(), "No carrier", 0); + } + // Wait for lock (4000ms) + for (int x = 0; x < 400 && ((status = GetStatus()) & FE_HAS_LOCK) == 0; x += 1) { + usleep(10000); + } + if (!(status & FE_HAS_LOCK)) { + Logger()->messagebf(LOG_ERR, "Tuning of device %s failed (%s)", tuner->Device(), "No lock"); + throw P2PVR::DeviceError(tuner->Device(), "No lock", 0); + } + } + void SetParameters(const dvb_frontend_parameters & feparams) const { if (ioctl(frontendFD, FE_SET_FRONTEND, &feparams) < 0) { @@ -113,29 +136,19 @@ class Frontend_OFDM : public Frontend { Logger()->messagebf(LOG_WARNING, "Channel %d, freq (%d Hz) outside card range", channel, feparams.frequency); continue; } - Logger()->messagebf(LOG_DEBUG, "Channel %d, Frequency %d Hz", channel, feparams.frequency); - SetParameters(feparams); - fe_status status; - // Wait for something - for (int x = 0; x < 150 && (status = GetStatus()) == 0; x += 100) { - usleep(1000); - } - // Was it useful? - if (!(status & (FE_HAS_SIGNAL | FE_HAS_CARRIER))) { - continue; - } - // Wait for lock - for (int x = 0; x < 500 && ((status = GetStatus()) & FE_HAS_LOCK) == 0; x += 100) { - usleep(10000); - } - // Did we get lock? - if (status & FE_HAS_LOCK) { + try { + Logger()->messagebf(LOG_DEBUG, "Channel %d, Frequency %d Hz", channel, feparams.frequency); + SetParameters(feparams); + WaitForLock(); Logger()->messagebf(LOG_INFO, "Found multiplex at %d Hz", feparams.frequency); Logger()->messagebf(LOG_DEBUG, "frequency %d", feparams.frequency); if (onFrequencyFound(feparams.frequency)) { return; } } + catch (const P2PVR::DeviceError &) { + // Moving on... + } } } } diff --git a/p2pvr/lib/localDevices.cpp b/p2pvr/lib/localDevices.cpp index 5478301..b93adee 100644 --- a/p2pvr/lib/localDevices.cpp +++ b/p2pvr/lib/localDevices.cpp @@ -37,9 +37,10 @@ LocalDevices::GetTunerSpecific(const DVBSI::DeliveryPtr & delivery, Ice::Long un Logger()->messagebf(LOG_DEBUG, "%s: Opening a sharable tuner (frequency %d, frontend %s)", __PRETTY_FUNCTION__, delivery->Frequency, openTuner->first); - auto tuner = P2PVR::PrivateTunerPrx::checkedCast(ice.adapter->addWithUUID(new Tuner(openTuner->first))); + P2PVR::PrivateTunerPtr t = new Tuner(openTuner->first); + t->TuneTo(delivery, ice); + auto tuner = P2PVR::PrivateTunerPrx::checkedCast(ice.adapter->addWithUUID(t)); openTuner->second = OpenTunerPtr(new OpenTuner(delivery, tuner, until, false)); - tuner->TuneTo(delivery); Logger()->messagebf(LOG_DEBUG, "%s: Tuned, returning (frequency %d, frontend %s)", __PRETTY_FUNCTION__, delivery->Frequency, openTuner->first); @@ -69,9 +70,10 @@ LocalDevices::GetTunerAny(short , const DVBSI::DeliveryPtr & delivery, Ice::Long Logger()->messagebf(LOG_DEBUG, "%s: Opening a sharable tuner (frequency %d, frontend %s)", __PRETTY_FUNCTION__, delivery->Frequency, openTuner->first); - auto tuner = P2PVR::PrivateTunerPrx::checkedCast(ice.adapter->addWithUUID(new Tuner(openTuner->first))); + P2PVR::PrivateTunerPtr t = new Tuner(openTuner->first); + t->TuneTo(delivery, ice); + auto tuner = P2PVR::PrivateTunerPrx::checkedCast(ice.adapter->addWithUUID(t)); openTuner->second = OpenTunerPtr(new OpenTuner(delivery, tuner, until, false)); - tuner->TuneTo(delivery); Logger()->messagebf(LOG_DEBUG, "%s: Tuned, returning (frequency %d, frontend %s)", __PRETTY_FUNCTION__, delivery->Frequency, openTuner->first); diff --git a/p2pvr/lib/maintenance.cpp b/p2pvr/lib/maintenance.cpp index 71ddf60..6fe44b2 100644 --- a/p2pvr/lib/maintenance.cpp +++ b/p2pvr/lib/maintenance.cpp @@ -1,6 +1,8 @@ #include "maintenance.h" #include "siParsers/network.h" #include "siParsers/event.h" +#include "siParsers/programAssociation.h" +#include "siParsers/programMap.h" #include "siParsers/service.h" #include <Ice/Ice.h> #include <stdexcept> @@ -12,6 +14,7 @@ #include "p2Helpers.h" #include "dvbsiHelpers.h" #include "containerIterator.h" +#include "mapIterator.h" #include "singleIterator.h" #include "temporaryIceAdapterObject.h" @@ -78,6 +81,42 @@ class SiNetworkInformationMerger : public SiNetworkInformationParser { CommonObjects * commonObjects; }; +class SiProgramMapHandler : public SiProgramMapParser { + public: + SiProgramMapHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + void HandleTable(DVBSI::ProgramMapPtr pmp) + { + Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); + BOOST_FOREACH(const auto & s, pmp->Streams) { + Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id); + BindColumns<DVBSI::StreamPtr>(rowState, s, pmp); + rowState.process(callBack); + } + } + + private: + ObjectRowState<DVBSI::StreamPtr> rowState; + const RowProcessorCallback callBack; +}; + +class SiProgramAssociationHandler : public SiProgramAssociationParser { + public: + void HandleTable(ProgramAssociationMapPtr pam) + { + Logger()->messagebf(LOG_DEBUG, "Program association table"); + BOOST_FOREACH(const auto & pa, *pam) { + Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second); + } + BOOST_FOREACH(const auto & pa, *pam) { + map[pa.first] = pa.second; + } + } + + ProgramAssociationMap map; +}; + class SiServicesMerger : public SiServicesParser { public: SiServicesMerger(CommonObjects * co) : commonObjects(co) { } @@ -228,6 +267,159 @@ Maintenance::UpdateServices(short type, const Ice::Current & ice) } } +void +CreatePMTColumns(const ColumnCreator & cc) +{ + cc("serviceId", true); + cc("programId", false); +} + +void +Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiProgramAssociationHandler(); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + BOOST_FOREACH(const auto & transport, deliveries) { + try { + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerSpecific(transport, time(NULL) + 300); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramAssociationTable(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + } + catch (...) { + // Tuning can fail + } + } + + SqlMergeTask mergeServices("postgres", "services"); + CreatePMTColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + Columns cols; + mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePMTColumns, &siparser->map)); + mergeServices.loadComplete(this); + mergeServices.execute(NULL); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } +} + +#include <rdbmsDataSource.h> +#include <column.h> +#include <selectcommand.h> +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; +class HandleAsInt : public DB::HandleField { + public: + virtual void null() { } + virtual void string(const char *, size_t ) { } + virtual void integer(int64_t v) { integerValue = v; } + virtual void floatingpoint(double) { } + virtual void timestamp(const struct tm &) { } + int64_t integerValue; +}; +class SiProgramMapMerger : public IHaveSubTasks { + public: + SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + commonObjects(co), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec))); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + auto db = commonObjects->dataSource<RdbmsDataSource>("postgres"); + SelectPtr sel = SelectPtr(db->getReadonly().newSelectCommand("select d.frequency, s.programid \ + from delivery_dvbt d, services s \ + where d.transportstreamid = s.transportstreamid \ + and s.programid is not null \ + order by s.transportstreamid, s.serviceid")); + int64_t curFreq = 0; + P2PVR::TunerPrx tuner; + while (sel->fetch()) { + HandleAsInt freq, pid; + (*sel)[0].apply(freq); + (*sel)[1].apply(pid); + + if (freq.integerValue != curFreq) { + if (tuner) { + devs->ReleaseTuner(tuner); + } + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + const auto transport = *std::find_if(deliveries.begin(), deliveries.end(), + [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq.integerValue; }); + tuner = devs->GetTunerSpecific(transport, time(NULL) + 10); + curFreq = freq.integerValue; + } + + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramMap(pid.integerValue, parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + } + if (tuner) { + devs->ReleaseTuner(tuner); + } + } + + private: + CommonObjects * commonObjects; + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice) +{ + SqlMergeTask mergeServiceStreams("postgres", "servicestreams"); + CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2)); + mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice)); + mergeServiceStreams.loadComplete(this); + mergeServiceStreams.execute(NULL); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } +} + class SiEventsMerger : public IHaveSubTasks { public: SiEventsMerger(short t, const Ice::Current & i) : diff --git a/p2pvr/lib/maintenance.h b/p2pvr/lib/maintenance.h index b3773cf..acffb72 100644 --- a/p2pvr/lib/maintenance.h +++ b/p2pvr/lib/maintenance.h @@ -11,6 +11,8 @@ class Maintenance : public P2PVR::Maintenance, public virtual CommonObjects { void UpdateTransports(short type, const Ice::Current &); void UpdateTransports(short type, Ice::CommunicatorPtr, Ice::ObjectAdapterPtr); void UpdateServices(short type, const Ice::Current &); + void UpdateProgramAssociations(short type, const Ice::Current &); + void UpdateProgramMaps(short type, const Ice::Current &); void UpdateEvents(short type, const Ice::Current &); }; diff --git a/p2pvr/lib/mapIterator.cpp b/p2pvr/lib/mapIterator.cpp new file mode 100644 index 0000000..5f0fbb0 --- /dev/null +++ b/p2pvr/lib/mapIterator.cpp @@ -0,0 +1,10 @@ +#include "mapIterator.h" +#include "p2Helpers.h" + +template<> +void BindColumns<std::pair<unsigned short const, unsigned short>>(RowState & rs, std::pair<unsigned short const, unsigned short> const & p) +{ + rs.fields[0] << p.first; + rs.fields[1] << p.second; +} + diff --git a/p2pvr/lib/mapIterator.h b/p2pvr/lib/mapIterator.h new file mode 100644 index 0000000..8a06fe6 --- /dev/null +++ b/p2pvr/lib/mapIterator.h @@ -0,0 +1,45 @@ +#ifndef MAPITERATOR_H +#define MAPITERATOR_H + +#include <iHaveSubTasks.h> +#include <boost/foreach.hpp> +#include "objectRowState.h" + +template <typename T> +class MapIterator : public IHaveSubTasks { + public: + template <typename ... Parents> + MapIterator(const ColumnSpecifier & cs, const T * m, const Parents & ... p) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + binder(boost::bind(&BindColumns<typename T::value_type, Parents...>, _1, _2, p...)), + columnSpecifier(cs), + map(m) + { + } + + void execute(ExecContext * ec) const + { + ObjectRowState<typename T::value_type> rs(columnSpecifier); + BOOST_FOREACH(const auto & i, *map) { + binder(rs, i); + rs.process(boost::bind(&MapIterator::executeChildren, this, ec)); + } + } + + private: + boost::function<void(RowState &, const typename T::value_type &)> binder; + const ColumnSpecifier columnSpecifier; + const T * map; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +#endif + + diff --git a/p2pvr/lib/objectRowState.h b/p2pvr/lib/objectRowState.h index 945ea89..cf42055 100644 --- a/p2pvr/lib/objectRowState.h +++ b/p2pvr/lib/objectRowState.h @@ -6,6 +6,7 @@ #include <rowSet.h> typedef boost::function<void(const std::string &, bool)> ColumnCreator; +typedef boost::function<void(const ColumnCreator &)> ColumnSpecifier; template <typename V, typename... Parents> void BindColumns(RowState &, const V &, const Parents & ...); @@ -19,19 +20,19 @@ void CreateColumns(const ColumnCreator &); template <typename T> class ObjectRowState : public RowState { public: - ObjectRowState() : - columns(ColumnCreatorHelper()) + ObjectRowState(const ColumnSpecifier & cs = CreateColumns<T>) : + columns(ColumnCreatorHelper(cs)) { fields.resize(columns.size()); } const Columns & getColumns() const { return columns; } private: - static Columns ColumnCreatorHelper() + static Columns ColumnCreatorHelper(const ColumnSpecifier & cs) { int index = 0; Columns columns; - CreateColumns<T>([&columns, &index](const std::string & name, bool) { + cs([&columns, &index](const std::string & name, bool) { columns.insert(new Column(index++, name)); }); return columns; diff --git a/p2pvr/lib/siParsers/programAssociation.cpp b/p2pvr/lib/siParsers/programAssociation.cpp new file mode 100644 index 0000000..0add29e --- /dev/null +++ b/p2pvr/lib/siParsers/programAssociation.cpp @@ -0,0 +1,29 @@ +#include "programAssociation.h" + +struct ProgramAssociation { + uint16_t program_number; +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + uint8_t reserved : 3; + uint8_t pid_hi : 5; +#else + uint8_t pid_hi : 5; + uint8_t reserved : 3; +#endif + uint8_t pid_lo; +}; + +bool +SiProgramAssociationParser::CheckTableId(u_char tableId) const +{ + return (tableId == 0x00); +} + +void +SiProgramAssociationParser::ParseSiTable(const ProgramAssociationSection * pas, ProgramAssociationMapPtr pam) +{ + LoopOverSection<ProgramAssociation>(pas->data, HILO(pas->header.section_length) - 12, [this,pam](const ProgramAssociation * sd) { + (*pam)[ntohs(sd->program_number)] = HILO(sd->pid); + }); +} + + diff --git a/p2pvr/lib/siParsers/programAssociation.h b/p2pvr/lib/siParsers/programAssociation.h new file mode 100644 index 0000000..63c7e4b --- /dev/null +++ b/p2pvr/lib/siParsers/programAssociation.h @@ -0,0 +1,23 @@ +#ifndef PROGRAMASSOCIATION_H +#define PROGRAMASSOCIATION_H + +#include "table.h" +#include <p2pvr.h> + +struct ProgramAssociationSection { + SiTableHeader header; + u_char data[]; +} __attribute__((packed)); + +typedef std::map<uint16_t, uint16_t> ProgramAssociationMap; +typedef boost::shared_ptr<ProgramAssociationMap> ProgramAssociationMapPtr; + +class SiProgramAssociationParser : public SiTableParser<ProgramAssociationSection, ProgramAssociationMapPtr, int> { + protected: + bool CheckTableId(u_char tableId) const; + void ParseSiTable(const struct ProgramAssociationSection * pas, ProgramAssociationMapPtr); +}; + +#endif + + diff --git a/p2pvr/lib/siParsers/programMap.cpp b/p2pvr/lib/siParsers/programMap.cpp new file mode 100644 index 0000000..bb59017 --- /dev/null +++ b/p2pvr/lib/siParsers/programMap.cpp @@ -0,0 +1,46 @@ +#include "programMap.h" +#include <boost/bind.hpp> +#include <logger.h> + +struct ProgramMapStream { + uint8_t stream_type; +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + u_char reserved1 :3; + u_char elementary_PID_hi :5; +#else + u_char elementary_PID_hi :5; + u_char reserved1 :3; +#endif + u_char elementary_PID_lo; +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + u_char reserved2 :4; + u_char ES_info_length_hi :4; +#else + u_char ES_info_length_hi :4; + u_char reserved2 :4; +#endif + u_char ES_info_length_lo; + u_char data[]; +}; + + +bool +SiProgramMapParser::CheckTableId(u_char tableId) const +{ + return (tableId == 0x02); +} + +void +SiProgramMapParser::ParseSiTable(const struct ProgramMap * pm, DVBSI::ProgramMapPtr pmp) +{ + pmp->ServiceId = ntohs(pm->header.content_id); + auto pms = ParseDescriptors<ProgramMapStream>(pm->data, HILO(pm->program_info_len)); + while (reinterpret_cast<const u_char*>(pms) < &pm->header.section_length_lo + HILO(pm->header.section_length) - 4) { + DVBSI::StreamPtr s = new DVBSI::Stream(); + s->Type = pms->stream_type; + s->Id = HILO(pms->elementary_PID); + pmp->Streams.push_back(s); + // Don't care what's in here, just need to move along + pms = ParseDescriptors<ProgramMapStream>(pms->data, HILO(pms->ES_info_length)); + } +} diff --git a/p2pvr/lib/siParsers/programMap.h b/p2pvr/lib/siParsers/programMap.h new file mode 100644 index 0000000..a00faa1 --- /dev/null +++ b/p2pvr/lib/siParsers/programMap.h @@ -0,0 +1,37 @@ +#ifndef PROGRAMMAP_H +#define PROGRAMMAP_H + +#include "table.h" +#include <p2pvr.h> + +struct ProgramMap { + SiTableHeader header; +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + u_char reserved1 :3; + u_char pcr_pid_hi :5; +#else + u_char pcr_pid_hi :5; + u_char reserved1 :3; +#endif + u_char pcr_pid_lo; +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + u_char reserved2 :4; + u_char program_info_len_hi :4; +#else + u_char program_info_len_hi :4; + u_char reserved2 :4; +#endif + u_char program_info_len_lo; + u_char data[]; +} __attribute__((packed)); + +class SiProgramMapParser : public SiTableParser<ProgramMap, DVBSI::ProgramMapPtr, u_char> { + protected: + bool CheckTableId(u_char tableId) const; + void ParseSiTable(const struct ProgramMap * nit, DVBSI::ProgramMapPtr); +}; + +#endif + + + diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h index be40813..55dbd17 100644 --- a/p2pvr/lib/siParsers/table.h +++ b/p2pvr/lib/siParsers/table.h @@ -49,7 +49,7 @@ struct SiTableHeaderBase { u_char reserved2 :3; u_char section_syntax_indicator :1; #endif - uint16_t section_length_lo :8; + uint8_t section_length_lo; } __attribute__((packed)); struct SiTableHeader : public SiTableHeaderBase { @@ -103,7 +103,7 @@ class SiTableParser : public SiTableParserBase { if (seen.find(sectionNumber) == seen.end()) { auto & obj = boost::get<0>(content); if (!obj) { - obj = new typename TargetType::element_type(); + obj = TargetType(new typename TargetType::element_type()); incomplete += 1; } ParseSiTable(siTable, obj); @@ -173,6 +173,17 @@ class SiTableParser : public SiTableParserBase { } } + template<typename LoopContent> + static void LoopOverSection(const u_char * data, size_t len, boost::function<void(const LoopContent *)> parser) + { + auto end = data + len; + while (data < end) { + auto loopData = reinterpret_cast<const LoopContent *>(data); + parser(loopData); + data += sizeof(LoopContent); + } + } + virtual bool CheckTableId(u_char tableId) const = 0; virtual void ParseSiTable(const TableType *, TargetType) = 0; virtual void HandleTable(TargetType table) = 0; diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index e508152..3cfb8a4 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -89,8 +89,7 @@ Tuner::ScanAndSendNetworkInformation(const P2PVR::RawDataClientPrx & client, con { frontend->FrequencyScan([this, &client, &ice](long) { try { - SendNetworkInformation(client, ice); - return true; + return (SendPID(0x10, client, ice) > 0); } catch (...) { return false; @@ -117,12 +116,24 @@ Tuner::SendServiceDescriptions(const P2PVR::RawDataClientPrx & client, const Ice } void +Tuner::SendProgramMap(Ice::Int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(pid, client, ice); +} + +void +Tuner::SendProgramAssociationTable(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) +{ + SendPID(0x00, client, ice); +} + +void Tuner::SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) { SendPID(0x12, client, ice); } -void +uint64_t Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) const { Logger()->messagebf(LOG_DEBUG, "%s: pid = 0x%x", __PRETTY_FUNCTION__, pid); @@ -138,15 +149,16 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre throw P2PVR::DeviceError("demux", strerror(errno), errno); } - ReadDemuxAndSend(demux, client); + return ReadDemuxAndSend(demux, client); } -void +uint64_t Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const { Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__); std::vector<Ice::AsyncResultPtr> asyncs; struct pollfd ufd; + uint64_t packetsSent = 0; bool exitFlag = false; do { // Wait for data to appear @@ -155,7 +167,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const ufd.events = POLLIN; if (poll(&ufd, 1, timeout) < 1) { Logger()->messagebf(LOG_DEBUG, "%s: Timed out waiting for data", __PRETTY_FUNCTION__); - throw P2PVR::DeviceError("demux", "Timed out. Tuned to a multiplex?", 0); + break; } // Read it @@ -187,6 +199,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const } asyncs.push_back(client->begin_NewData(buf)); + packetsSent += 1; asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) { if (a->isCompleted()) { @@ -200,6 +213,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const client->end_NewData(a); } Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__); + return packetsSent; } int diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 8111f1e..4efde20 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -22,6 +22,8 @@ class Tuner : public P2PVR::PrivateTuner { void SendNetworkInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current&); void SendBouquetAssociations(const P2PVR::RawDataClientPrx & client, const Ice::Current&); void SendServiceDescriptions(const P2PVR::RawDataClientPrx & client, const Ice::Current&); + void SendProgramMap(Ice::Int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current&); + void SendProgramAssociationTable(const P2PVR::RawDataClientPrx & client, const Ice::Current&); void SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current&); int StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &); @@ -30,8 +32,8 @@ class Tuner : public P2PVR::PrivateTuner { private: static bool crc32(const P2PVR::Data &); int OpenDemux() const; - void SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; - void ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; + uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; + uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; void startSenderThread(); void senderThread(); |