diff options
author | randomdan <randomdan@localhost> | 2013-12-03 15:21:25 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2013-12-03 15:21:25 +0000 |
commit | c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2 (patch) | |
tree | e3c3e5bed636fa75aec2e853ae30689158df2d65 | |
parent | Monitor tuner usage and close them if they go idle for a period of time (diff) | |
download | p2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.tar.bz2 p2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.tar.xz p2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.zip |
Tidy maintenance code into separate files
Change UpdateTransports to be UpdateNetwork
-rw-r--r-- | p2pvr/daemon/daemon.cpp | 2 | ||||
-rw-r--r-- | p2pvr/ice/commonHelpers.h | 1 | ||||
-rw-r--r-- | p2pvr/ice/p2pvr.ice | 2 | ||||
-rw-r--r-- | p2pvr/lib/maintenance.cpp | 457 | ||||
-rw-r--r-- | p2pvr/lib/maintenance.h | 7 | ||||
-rw-r--r-- | p2pvr/lib/maintenance/events.cpp | 88 | ||||
-rw-r--r-- | p2pvr/lib/maintenance/network.cpp | 113 | ||||
-rw-r--r-- | p2pvr/lib/maintenance/programAssociations.cpp | 80 | ||||
-rw-r--r-- | p2pvr/lib/maintenance/programMap.cpp | 133 | ||||
-rw-r--r-- | p2pvr/lib/maintenance/services.cpp | 68 | ||||
-rw-r--r-- | p2pvr/lib/temporaryIceAdapterObject.h | 2 |
11 files changed, 497 insertions, 456 deletions
diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index 4f55685..91b8daa 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -33,7 +33,7 @@ class P2PvrDaemon : public Daemon { adapter->activate(); auto maint = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(ic->stringToIdentity("Maintenance"))); - maint->UpdateTransports(FE_OFDM); + maint->UpdateNetwork(FE_OFDM); maint->UpdateServices(FE_OFDM); maint->UpdateProgramAssociations(FE_OFDM); maint->UpdateProgramMaps(FE_OFDM); diff --git a/p2pvr/ice/commonHelpers.h b/p2pvr/ice/commonHelpers.h index b54523e..58caa3d 100644 --- a/p2pvr/ice/commonHelpers.h +++ b/p2pvr/ice/commonHelpers.h @@ -3,6 +3,7 @@ #include <p2pvr.h> #include <ostream> +#include <iomanip> namespace Common { template<typename C, typename T> diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index bae39f8..9dac2a4 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -236,7 +236,7 @@ module P2PVR { interface Maintenance { idempotent void UpdateAll(); - idempotent void UpdateTransports(short type); + idempotent void UpdateNetwork(short type); idempotent void UpdateServices(short type); idempotent void UpdateProgramAssociations(short type); idempotent void UpdateProgramMaps(short type); diff --git a/p2pvr/lib/maintenance.cpp b/p2pvr/lib/maintenance.cpp index 6fe44b2..540b323 100644 --- a/p2pvr/lib/maintenance.cpp +++ b/p2pvr/lib/maintenance.cpp @@ -1,27 +1,10 @@ #include "maintenance.h" -#include "siParsers/network.h" -#include "siParsers/event.h" -#include "siParsers/programAssociation.h" -#include "siParsers/programMap.h" -#include "siParsers/service.h" #include <Ice/Ice.h> -#include <stdexcept> -#include <iomanip> -#include <logger.h> -#include <sqlMergeTask.h> -#include <commonObjects.h> -#include <commonHelpers.h> -#include "p2Helpers.h" -#include "dvbsiHelpers.h" -#include "containerIterator.h" -#include "mapIterator.h" -#include "singleIterator.h" -#include "temporaryIceAdapterObject.h" - #include <linux/dvb/frontend.h> +#include <sqlMergeTask.h> void -SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key) +Maintenance::SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key) { merge->cols.insert(new SqlMergeTask::TargetColumn(name, key)); if (key) { @@ -29,140 +12,6 @@ SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key } } -class SiNetworkInformationMerger : public SiNetworkInformationParser { - public: - SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { } - - void HandleTable(DVBSI::NetworkPtr n) - { - Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name); - BOOST_FOREACH(const auto & ts, n->TransportStreams) { - Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId); - BOOST_FOREACH(const auto & s, ts->Services) { - Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType); - } - if (ts->Terrestrial) { - Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency); - } - } - - SqlMergeTask mergeNetwork("postgres", "networks"); - CreateColumns<DVBSI::NetworkPtr>(boost::bind(SqlMergeColumnsInserter, &mergeNetwork, _1, _2)); - std::vector<DVBSI::NetworkPtr> networks = { n }; - mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks)); - mergeNetwork.loadComplete(commonObjects); - mergeNetwork.execute(NULL); - - SqlMergeTask mergeTransports("postgres", "transportstreams"); - CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeTransports, _1, _2)); - mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams, n)); - mergeTransports.loadComplete(commonObjects); - mergeTransports.execute(NULL); - - SqlMergeTask mergeDvbt("postgres", "delivery_dvbt"); - CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(SqlMergeColumnsInserter, &mergeDvbt, _1, _2)); - BOOST_FOREACH(const auto & s, n->TransportStreams) { - if (s->Terrestrial) { - mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial, s)); - } - } - mergeDvbt.loadComplete(commonObjects); - mergeDvbt.execute(NULL); - - SqlMergeTask mergeServices("postgres", "services"); - CreateColumns<DVBSI::NetworkService>(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); - BOOST_FOREACH(const auto & s, n->TransportStreams) { - mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services, s)); - } - mergeServices.loadComplete(commonObjects); - mergeServices.execute(NULL); - } - private: - CommonObjects * commonObjects; -}; - -class SiProgramMapHandler : public SiProgramMapParser { - public: - SiProgramMapHandler(const RowProcessorCallback & cb) : - callBack(cb) {} - - void HandleTable(DVBSI::ProgramMapPtr pmp) - { - Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); - BOOST_FOREACH(const auto & s, pmp->Streams) { - Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id); - BindColumns<DVBSI::StreamPtr>(rowState, s, pmp); - rowState.process(callBack); - } - } - - private: - ObjectRowState<DVBSI::StreamPtr> rowState; - const RowProcessorCallback callBack; -}; - -class SiProgramAssociationHandler : public SiProgramAssociationParser { - public: - void HandleTable(ProgramAssociationMapPtr pam) - { - Logger()->messagebf(LOG_DEBUG, "Program association table"); - BOOST_FOREACH(const auto & pa, *pam) { - Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second); - } - BOOST_FOREACH(const auto & pa, *pam) { - map[pa.first] = pa.second; - } - } - - ProgramAssociationMap map; -}; - -class SiServicesMerger : public SiServicesParser { - public: - SiServicesMerger(CommonObjects * co) : commonObjects(co) { } - - void HandleTable(DVBSI::TransportStreamPtr ts) - { - Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId); - BOOST_FOREACH(const auto & s, ts->Services) { - Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d", - s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1), - (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"), - s->RunningStatus, s->FreeCaMode); - } - - SqlMergeTask mergeServices("postgres", "services"); - CreateColumns<DVBSI::ServicePtr>(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); - // Don't change the list of services available from the network - mergeServices.doDelete = VariableType(false); - mergeServices.doInsert = VariableType(false); - mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services, ts)); - mergeServices.loadComplete(commonObjects); - mergeServices.execute(NULL); - } - - private: - CommonObjects * commonObjects; -}; - -class SiEventsHandler : public SiEpgParser { - public: - SiEventsHandler(const RowProcessorCallback & cb) : - callBack(cb) {} - - void HandleTable(DVBSI::EventPtr e) - { - Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s", - e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime); - BindColumns<DVBSI::EventPtr>(rowState, e); - rowState.process(callBack); - } - - private: - ObjectRowState<DVBSI::EventPtr> rowState; - const RowProcessorCallback callBack; -}; - void Maintenance::UpdateAll(const Ice::Current & ice) { @@ -175,306 +24,10 @@ Maintenance::UpdateAll(const Ice::Current & ice) void Maintenance::UpdateAll(short type, const Ice::Current & ice) { - UpdateTransports(type, ice); + UpdateNetwork(type, ice); UpdateServices(type, ice); + UpdateProgramAssociations(type, ice); + UpdateProgramMaps(type, ice); UpdateEvents(type, ice); } -void -Maintenance::UpdateTransports(short type, const Ice::Current & ice) -{ - auto ic = ice.adapter->getCommunicator(); - UpdateTransports(type, ic, ice.adapter); -} - -void -Maintenance::UpdateTransports(short type, Ice::CommunicatorPtr ic, Ice::ObjectAdapterPtr adapter) -{ - auto devs = P2PVR::DevicesPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); - auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("SI"))); - auto siparser = new SiNetworkInformationMerger(this); - TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(adapter, siparser); - - if (!devs) { - throw std::runtime_error("bad proxy(s)"); - } - const auto transports = si->GetAllDeliveries(FE_OFDM); - // Attempt to just download fresh data - BOOST_FOREACH(const auto & transport, transports) { - P2PVR::TunerPrx tuner; - try { - tuner = devs->GetTunerAny(type, transport, time(NULL) + 300); - } - catch (...) { - Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__); - continue; - } - if (!tuner) { - continue; - } - try { - tuner->SendNetworkInformation(parser); - devs->ReleaseTuner(tuner); - BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { - ds.second->commit(); - } - return; - } - catch (const std::exception & ex) { - Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what()); - devs->ReleaseTuner(tuner); - throw; - } - catch (...) { - Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__); - devs->ReleaseTuner(tuner); - throw; - } - } - // If we can't do that, do a complete scan - auto tuner = devs->GetPrivateTuner(type, time(NULL) + 300); - tuner->ScanAndSendNetworkInformation(parser); - devs->ReleaseTuner(tuner); -} - -void -Maintenance::UpdateServices(short type, const Ice::Current & ice) -{ - auto ic = ice.adapter->getCommunicator(); - auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); - auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); - - if (!devs || !si) { - throw std::runtime_error("bad proxy(s)"); - } - - auto siparser = new SiServicesMerger(this); - TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); - - const auto deliveries = si->GetAllDeliveries(type); - if (deliveries.empty()) { - throw std::runtime_error("no delivery methods"); - } - - Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); - auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 90); - Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__); - tuner->SendServiceDescriptions(parser); - Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__); - devs->ReleaseTuner(tuner); - BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { - ds.second->commit(); - } -} - -void -CreatePMTColumns(const ColumnCreator & cc) -{ - cc("serviceId", true); - cc("programId", false); -} - -void -Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice) -{ - auto ic = ice.adapter->getCommunicator(); - auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); - auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); - - if (!devs || !si) { - throw std::runtime_error("bad proxy(s)"); - } - - auto siparser = new SiProgramAssociationHandler(); - TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); - - const auto deliveries = si->GetAllDeliveries(type); - if (deliveries.empty()) { - throw std::runtime_error("no delivery methods"); - } - - BOOST_FOREACH(const auto & transport, deliveries) { - try { - Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); - auto tuner = devs->GetTunerSpecific(transport, time(NULL) + 300); - Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); - tuner->SendProgramAssociationTable(parser); - Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); - devs->ReleaseTuner(tuner); - } - catch (...) { - // Tuning can fail - } - } - - SqlMergeTask mergeServices("postgres", "services"); - CreatePMTColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); - // Don't change the list of services available from the network - mergeServices.doDelete = VariableType(false); - mergeServices.doInsert = VariableType(false); - Columns cols; - mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePMTColumns, &siparser->map)); - mergeServices.loadComplete(this); - mergeServices.execute(NULL); - BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { - ds.second->commit(); - } -} - -#include <rdbmsDataSource.h> -#include <column.h> -#include <selectcommand.h> -typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; -class HandleAsInt : public DB::HandleField { - public: - virtual void null() { } - virtual void string(const char *, size_t ) { } - virtual void integer(int64_t v) { integerValue = v; } - virtual void floatingpoint(double) { } - virtual void timestamp(const struct tm &) { } - int64_t integerValue; -}; -class SiProgramMapMerger : public IHaveSubTasks { - public: - SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) : - SourceObject(__PRETTY_FUNCTION__), - IHaveSubTasks(NULL), - commonObjects(co), - type(t), - ice(i) { } - - void execute(ExecContext * ec) const - { - auto ic = ice.adapter->getCommunicator(); - auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); - auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); - - if (!devs || !si) { - throw std::runtime_error("bad proxy(s)"); - } - - TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, - new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec))); - - const auto deliveries = si->GetAllDeliveries(type); - if (deliveries.empty()) { - throw std::runtime_error("no delivery methods"); - } - - auto db = commonObjects->dataSource<RdbmsDataSource>("postgres"); - SelectPtr sel = SelectPtr(db->getReadonly().newSelectCommand("select d.frequency, s.programid \ - from delivery_dvbt d, services s \ - where d.transportstreamid = s.transportstreamid \ - and s.programid is not null \ - order by s.transportstreamid, s.serviceid")); - int64_t curFreq = 0; - P2PVR::TunerPrx tuner; - while (sel->fetch()) { - HandleAsInt freq, pid; - (*sel)[0].apply(freq); - (*sel)[1].apply(pid); - - if (freq.integerValue != curFreq) { - if (tuner) { - devs->ReleaseTuner(tuner); - } - Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); - const auto transport = *std::find_if(deliveries.begin(), deliveries.end(), - [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq.integerValue; }); - tuner = devs->GetTunerSpecific(transport, time(NULL) + 10); - curFreq = freq.integerValue; - } - - Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); - tuner->SendProgramMap(pid.integerValue, parser); - Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); - } - if (tuner) { - devs->ReleaseTuner(tuner); - } - } - - private: - CommonObjects * commonObjects; - const short type; - const Ice::Current & ice; - - void executeChildren(ExecContext * ec) const - { - BOOST_FOREACH(const Tasks::value_type & sq, normal) { - sq->execute(ec); - } - } -}; - -void -Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice) -{ - SqlMergeTask mergeServiceStreams("postgres", "servicestreams"); - CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2)); - mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice)); - mergeServiceStreams.loadComplete(this); - mergeServiceStreams.execute(NULL); - BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { - ds.second->commit(); - } -} - -class SiEventsMerger : public IHaveSubTasks { - public: - SiEventsMerger(short t, const Ice::Current & i) : - SourceObject(__PRETTY_FUNCTION__), - IHaveSubTasks(NULL), - type(t), - ice(i) { } - - void execute(ExecContext * ec) const - { - auto ic = ice.adapter->getCommunicator(); - auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); - auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); - - if (!devs || !si) { - throw std::runtime_error("bad proxy(s)"); - } - - TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, - new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec))); - - const auto deliveries = si->GetAllDeliveries(type); - if (deliveries.empty()) { - throw std::runtime_error("no delivery methods"); - } - - Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); - auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 600); - Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__); - tuner->SendEventInformation(parser); - devs->ReleaseTuner(tuner); - } - - private: - const short type; - const Ice::Current & ice; - - void executeChildren(ExecContext * ec) const - { - BOOST_FOREACH(const Tasks::value_type & sq, normal) { - sq->execute(ec); - } - } -}; - -void -Maintenance::UpdateEvents(short type, const Ice::Current & ice) -{ - SqlMergeTask mergeEvents("postgres", "events"); - CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2)); - mergeEvents.sources.insert(new SiEventsMerger(type, ice)); - mergeEvents.loadComplete(this); - mergeEvents.execute(NULL); - BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { - ds.second->commit(); - } - Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__); -} diff --git a/p2pvr/lib/maintenance.h b/p2pvr/lib/maintenance.h index acffb72..b1965d1 100644 --- a/p2pvr/lib/maintenance.h +++ b/p2pvr/lib/maintenance.h @@ -4,16 +4,19 @@ #include <p2pvr.h> #include <commonObjects.h> +class SqlMergeTask; + class Maintenance : public P2PVR::Maintenance, public virtual CommonObjects { public: void UpdateAll(const Ice::Current &); void UpdateAll(short type, const Ice::Current &); - void UpdateTransports(short type, const Ice::Current &); - void UpdateTransports(short type, Ice::CommunicatorPtr, Ice::ObjectAdapterPtr); + void UpdateNetwork(short type, const Ice::Current &); void UpdateServices(short type, const Ice::Current &); void UpdateProgramAssociations(short type, const Ice::Current &); void UpdateProgramMaps(short type, const Ice::Current &); void UpdateEvents(short type, const Ice::Current &); + + static void SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key); }; #endif diff --git a/p2pvr/lib/maintenance/events.cpp b/p2pvr/lib/maintenance/events.cpp new file mode 100644 index 0000000..7edac72 --- /dev/null +++ b/p2pvr/lib/maintenance/events.cpp @@ -0,0 +1,88 @@ +#include "../maintenance.h" +#include "../siParsers/event.h" +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include "../p2Helpers.h" +#include "../dvbsiHelpers.h" +#include "../containerIterator.h" +#include "../singleIterator.h" +#include "../temporaryIceAdapterObject.h" +#include <commonHelpers.h> + +class SiEventsHandler : public SiEpgParser { + public: + SiEventsHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + void HandleTable(DVBSI::EventPtr e) + { + Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s", + e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime); + BindColumns<DVBSI::EventPtr>(rowState, e); + rowState.process(callBack); + } + + private: + ObjectRowState<DVBSI::EventPtr> rowState; + const RowProcessorCallback callBack; +}; + +class SiEventsMerger : public IHaveSubTasks { + public: + SiEventsMerger(short t, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec))); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 600); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__); + tuner->SendEventInformation(parser); + devs->ReleaseTuner(tuner); + } + + private: + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateEvents(short type, const Ice::Current & ice) +{ + SqlMergeTask mergeEvents("postgres", "events"); + CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2)); + mergeEvents.sources.insert(new SiEventsMerger(type, ice)); + mergeEvents.loadComplete(this); + mergeEvents.execute(NULL); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } + Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__); +} + diff --git a/p2pvr/lib/maintenance/network.cpp b/p2pvr/lib/maintenance/network.cpp new file mode 100644 index 0000000..6b0ebc6 --- /dev/null +++ b/p2pvr/lib/maintenance/network.cpp @@ -0,0 +1,113 @@ +#include "../maintenance.h" +#include "../siParsers/network.h" +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include "../p2Helpers.h" +#include "../dvbsiHelpers.h" +#include "../containerIterator.h" +#include "../singleIterator.h" +#include "../temporaryIceAdapterObject.h" + +class SiNetworkInformationMerger : public SiNetworkInformationParser { + public: + SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { } + + void HandleTable(DVBSI::NetworkPtr n) + { + Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name); + BOOST_FOREACH(const auto & ts, n->TransportStreams) { + Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType); + } + if (ts->Terrestrial) { + Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency); + } + } + + SqlMergeTask mergeNetwork("postgres", "networks"); + CreateColumns<DVBSI::NetworkPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeNetwork, _1, _2)); + std::vector<DVBSI::NetworkPtr> networks = { n }; + mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks)); + mergeNetwork.loadComplete(commonObjects); + mergeNetwork.execute(NULL); + + SqlMergeTask mergeTransports("postgres", "transportstreams"); + CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeTransports, _1, _2)); + mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams, n)); + mergeTransports.loadComplete(commonObjects); + mergeTransports.execute(NULL); + + SqlMergeTask mergeDvbt("postgres", "delivery_dvbt"); + CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeDvbt, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + if (s->Terrestrial) { + mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial, s)); + } + } + mergeDvbt.loadComplete(commonObjects); + mergeDvbt.execute(NULL); + + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::NetworkService>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services, s)); + } + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + } + private: + CommonObjects * commonObjects; +}; + +void +Maintenance::UpdateNetwork(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + auto siparser = new SiNetworkInformationMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + if (!devs) { + throw std::runtime_error("bad proxy(s)"); + } + const auto transports = si->GetAllDeliveries(type); + // Attempt to just download fresh data + BOOST_FOREACH(const auto & transport, transports) { + P2PVR::TunerPrx tuner; + try { + tuner = devs->GetTunerAny(type, transport, time(NULL) + 300); + } + catch (...) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__); + continue; + } + if (!tuner) { + continue; + } + try { + tuner->SendNetworkInformation(parser); + devs->ReleaseTuner(tuner); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } + return; + } + catch (const std::exception & ex) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what()); + devs->ReleaseTuner(tuner); + throw; + } + catch (...) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + throw; + } + } + // If we can't do that, do a complete scan + auto tuner = devs->GetPrivateTuner(type, time(NULL) + 300); + tuner->ScanAndSendNetworkInformation(parser); + devs->ReleaseTuner(tuner); +} + diff --git a/p2pvr/lib/maintenance/programAssociations.cpp b/p2pvr/lib/maintenance/programAssociations.cpp new file mode 100644 index 0000000..4d32a86 --- /dev/null +++ b/p2pvr/lib/maintenance/programAssociations.cpp @@ -0,0 +1,80 @@ +#include "../maintenance.h" +#include "../siParsers/programAssociation.h" +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include "../p2Helpers.h" +#include "../dvbsiHelpers.h" +#include "../mapIterator.h" +#include "../temporaryIceAdapterObject.h" + +class SiProgramAssociationHandler : public SiProgramAssociationParser { + public: + void HandleTable(ProgramAssociationMapPtr pam) + { + Logger()->messagebf(LOG_DEBUG, "Program association table"); + BOOST_FOREACH(const auto & pa, *pam) { + Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second); + } + BOOST_FOREACH(const auto & pa, *pam) { + map[pa.first] = pa.second; + } + } + + ProgramAssociationMap map; +}; + +static +void +CreatePATColumns(const ColumnCreator & cc) +{ + cc("serviceId", true); + cc("programId", false); +} + +void +Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiProgramAssociationHandler(); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + BOOST_FOREACH(const auto & transport, deliveries) { + try { + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerSpecific(transport, time(NULL) + 300); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramAssociationTable(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + } + catch (...) { + // Tuning can fail + } + } + + SqlMergeTask mergeServices("postgres", "services"); + CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + Columns cols; + mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map)); + mergeServices.loadComplete(this); + mergeServices.execute(NULL); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } +} + diff --git a/p2pvr/lib/maintenance/programMap.cpp b/p2pvr/lib/maintenance/programMap.cpp new file mode 100644 index 0000000..f230f77 --- /dev/null +++ b/p2pvr/lib/maintenance/programMap.cpp @@ -0,0 +1,133 @@ +#include "../maintenance.h" +#include "../siParsers/programMap.h" +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include "../p2Helpers.h" +#include "../dvbsiHelpers.h" +#include "../containerIterator.h" +#include "../singleIterator.h" +#include "../temporaryIceAdapterObject.h" +#include <rdbmsDataSource.h> +#include <column.h> +#include <selectcommand.h> +#include <sqlHandleAsVariableType.h> + +class SiProgramMapHandler : public SiProgramMapParser { + public: + SiProgramMapHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + void HandleTable(DVBSI::ProgramMapPtr pmp) + { + Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); + BOOST_FOREACH(const auto & s, pmp->Streams) { + Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id); + } + BOOST_FOREACH(const auto & s, pmp->Streams) { + BindColumns<DVBSI::StreamPtr>(rowState, s, pmp); + rowState.process(callBack); + } + } + + private: + ObjectRowState<DVBSI::StreamPtr> rowState; + const RowProcessorCallback callBack; +}; + +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; + +template<typename T> +void +operator<<(T & val, const DB::Column & col) +{ + HandleAsVariableType havt; + col.apply(havt); + val = havt.variable; +} + +class SiProgramMapMerger : public IHaveSubTasks { + public: + SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + commonObjects(co), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec))); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + auto db = commonObjects->dataSource<RdbmsDataSource>("postgres"); + SelectPtr sel = SelectPtr(db->getReadonly().newSelectCommand("select d.frequency, s.programid \ + from delivery_dvbt d, services s \ + where d.transportstreamid = s.transportstreamid \ + and s.programid is not null \ + order by s.transportstreamid, s.serviceid")); + int64_t curFreq = 0; + P2PVR::TunerPrx tuner; + while (sel->fetch()) { + int64_t freq, pid; + freq << (*sel)[0]; + pid << (*sel)[1]; + + if (freq != curFreq) { + if (tuner) { + devs->ReleaseTuner(tuner); + } + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + const auto transport = *std::find_if(deliveries.begin(), deliveries.end(), + [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; }); + tuner = devs->GetTunerSpecific(transport, time(NULL) + 10); + curFreq = freq; + } + + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramMap(pid, parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + } + if (tuner) { + devs->ReleaseTuner(tuner); + } + } + + private: + CommonObjects * commonObjects; + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice) +{ + SqlMergeTask mergeServiceStreams("postgres", "servicestreams"); + CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2)); + mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice)); + mergeServiceStreams.loadComplete(this); + mergeServiceStreams.execute(NULL); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } +} + diff --git a/p2pvr/lib/maintenance/services.cpp b/p2pvr/lib/maintenance/services.cpp new file mode 100644 index 0000000..30df800 --- /dev/null +++ b/p2pvr/lib/maintenance/services.cpp @@ -0,0 +1,68 @@ +#include "../maintenance.h" +#include "../siParsers/service.h" +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include "../p2Helpers.h" +#include "../dvbsiHelpers.h" +#include "../containerIterator.h" +#include "../singleIterator.h" +#include "../temporaryIceAdapterObject.h" + +class SiServicesMerger : public SiServicesParser { + public: + SiServicesMerger(CommonObjects * co) : commonObjects(co) { } + + void HandleTable(DVBSI::TransportStreamPtr ts) + { + Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d", + s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1), + (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"), + s->RunningStatus, s->FreeCaMode); + } + + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::ServicePtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services, ts)); + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + } + + private: + CommonObjects * commonObjects; +}; + +void +Maintenance::UpdateServices(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiServicesMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 90); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__); + tuner->SendServiceDescriptions(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + ds.second->commit(); + } +} + diff --git a/p2pvr/lib/temporaryIceAdapterObject.h b/p2pvr/lib/temporaryIceAdapterObject.h index db4b340..e8b83d8 100644 --- a/p2pvr/lib/temporaryIceAdapterObject.h +++ b/p2pvr/lib/temporaryIceAdapterObject.h @@ -1,6 +1,8 @@ #ifndef TEMPORARYICEADAPTER_H #define TEMPORARYICEADAPTER_H +#include <Ice/ObjectAdapter.h> + template <typename Object> class TemporarayIceAdapterObject { public: |