diff options
author | randomdan <randomdan@localhost> | 2014-03-13 19:42:07 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2014-03-13 19:42:07 +0000 |
commit | ab1eee942e75874739ce5f0b4ba289aac5cc3faf (patch) | |
tree | 6e43828794fe0c0c5c9921ec1911695b67357c50 /p2pvr/daemon/maintenance | |
parent | Expose more of the interface (diff) | |
download | p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.bz2 p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.xz p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.zip |
Restructure into more sensibly arranged libs
Diffstat (limited to 'p2pvr/daemon/maintenance')
-rw-r--r-- | p2pvr/daemon/maintenance/events.cpp | 92 | ||||
-rw-r--r-- | p2pvr/daemon/maintenance/network.cpp | 107 | ||||
-rw-r--r-- | p2pvr/daemon/maintenance/programAssociations.cpp | 80 | ||||
-rw-r--r-- | p2pvr/daemon/maintenance/programMap.cpp | 133 | ||||
-rw-r--r-- | p2pvr/daemon/maintenance/services.cpp | 68 |
5 files changed, 480 insertions, 0 deletions
diff --git a/p2pvr/daemon/maintenance/events.cpp b/p2pvr/daemon/maintenance/events.cpp new file mode 100644 index 0000000..b0c8b74 --- /dev/null +++ b/p2pvr/daemon/maintenance/events.cpp @@ -0,0 +1,92 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/event.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> +#include <commonHelpers.h> + +class SiEventsHandler : public SiEpgParser { + public: + SiEventsHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + void HandleTable(DVBSI::EventPtr e) + { + Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s", + e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime); + BindColumns<DVBSI::EventPtr>(rowState, e); + rowState.process(callBack); + } + + private: + ObjectRowState<DVBSI::EventPtr> rowState; + const RowProcessorCallback callBack; +}; + +class SiEventsMerger : public IHaveSubTasks { + public: + SiEventsMerger(short t, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec))); + + auto delivery = si->GetDeliveryForSi(); + if (!delivery) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, delivery); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__); + tuner->SendEventInformation(parser); + devs->ReleaseTuner(tuner); + } + + private: + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateEvents(short type, const Ice::Current & ice) +{ + TxHelper tx(this); + SqlMergeTask mergeEvents("postgres", "events"); + CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2)); + mergeEvents.sources.insert(new SiEventsMerger(type, ice)); + mergeEvents.loadComplete(this); + mergeEvents.execute(NULL); + tx.Commit(); + Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__); + + auto ic = ice.adapter->getCommunicator(); + auto sch = P2PVR::SchedulesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("Schedules"))); + sch->DoReschedule(); +} + diff --git a/p2pvr/daemon/maintenance/network.cpp b/p2pvr/daemon/maintenance/network.cpp new file mode 100644 index 0000000..d9ce0ec --- /dev/null +++ b/p2pvr/daemon/maintenance/network.cpp @@ -0,0 +1,107 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/network.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiNetworkInformationMerger : public SiNetworkInformationParser { + public: + SiNetworkInformationMerger(DatabaseClient * co) : commonObjects(co) { } + + bool HandleTable(DVBSI::NetworkPtr n) + { + Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name); + BOOST_FOREACH(const auto & ts, n->TransportStreams) { + Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType); + } + if (ts->Terrestrial) { + Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency); + } + } + + DatabaseClient::TxHelper tx(commonObjects); + SqlMergeTask mergeNetwork("postgres", "networks"); + CreateColumns<DVBSI::NetworkPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeNetwork, _1, _2)); + std::vector<DVBSI::NetworkPtr> networks = { n }; + mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks)); + mergeNetwork.loadComplete(commonObjects); + mergeNetwork.execute(NULL); + + SqlMergeTask mergeTransports("postgres", "transportstreams"); + CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeTransports, _1, _2)); + mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams)); + mergeTransports.loadComplete(commonObjects); + mergeTransports.execute(NULL); + + SqlMergeTask mergeDvbt("postgres", "delivery_dvbt"); + CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeDvbt, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + if (s->Terrestrial) { + mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial)); + } + } + mergeDvbt.loadComplete(commonObjects); + mergeDvbt.execute(NULL); + + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::NetworkService>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services)); + } + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + return false; + } + private: + DatabaseClient * commonObjects; +}; + +void +Maintenance::UpdateNetwork(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + auto siparser = new SiNetworkInformationMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + if (!devs) { + throw std::runtime_error("bad proxy(s)"); + } + auto transport = si->GetDeliveryForSi(); + if (transport) { + P2PVR::TunerPrx tuner; + try { + tuner = devs->GetTunerAny(type, transport); + tuner->SendNetworkInformation(parser); + devs->ReleaseTuner(tuner); + return; + } + catch (const P2PVR::NoSuitableDeviceAvailable &) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__); + throw; + } + catch (const std::exception & ex) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what()); + devs->ReleaseTuner(tuner); + throw; + } + catch (...) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + throw; + } + } + // If we can't do that, do a complete scan + auto tuner = devs->GetPrivateTuner(type); + tuner->ScanAndSendNetworkInformation(parser); + devs->ReleaseTuner(tuner); +} + diff --git a/p2pvr/daemon/maintenance/programAssociations.cpp b/p2pvr/daemon/maintenance/programAssociations.cpp new file mode 100644 index 0000000..ad6438c --- /dev/null +++ b/p2pvr/daemon/maintenance/programAssociations.cpp @@ -0,0 +1,80 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/programAssociation.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <mapIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiProgramAssociationHandler : public SiProgramAssociationParser { + public: + bool HandleTable(ProgramAssociationMapPtr pam) + { + Logger()->messagebf(LOG_DEBUG, "Program association table"); + BOOST_FOREACH(const auto & pa, *pam) { + Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second); + } + BOOST_FOREACH(const auto & pa, *pam) { + map[pa.first] = pa.second; + } + return false; + } + + ProgramAssociationMap map; +}; + +static +void +CreatePATColumns(const ColumnCreator & cc) +{ + cc("serviceId", true); + cc("programId", false); +} + +void +Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiProgramAssociationHandler(); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + BOOST_FOREACH(const auto & transport, deliveries) { + try { + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerSpecific(transport); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramAssociationTable(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + } + catch (...) { + // Tuning can fail + } + } + + TxHelper tx(this); + SqlMergeTask mergeServices("postgres", "services"); + CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + Columns cols; + mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map)); + mergeServices.loadComplete(this); + mergeServices.execute(NULL); +} + diff --git a/p2pvr/daemon/maintenance/programMap.cpp b/p2pvr/daemon/maintenance/programMap.cpp new file mode 100644 index 0000000..94d7752 --- /dev/null +++ b/p2pvr/daemon/maintenance/programMap.cpp @@ -0,0 +1,133 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/programMap.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> +#include <rdbmsDataSource.h> +#include <column.h> +#include <selectcommand.h> +#include <sqlHandleAsVariableType.h> + +class SiProgramMapHandler : public SiProgramMapParser { + public: + SiProgramMapHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + bool HandleTable(DVBSI::ProgramMapPtr pmp) + { + Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); + BOOST_FOREACH(const auto & s, pmp->Streams) { + Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id); + } + BOOST_FOREACH(const auto & s, pmp->Streams) { + BindColumns<DVBSI::StreamPtr>(rowState, s); + rowState.process(callBack); + } + return false; + } + + private: + ObjectRowState<DVBSI::StreamPtr> rowState; + const RowProcessorCallback callBack; +}; + +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; + +template<typename T> +void +operator<<(T & val, const DB::Column & col) +{ + HandleAsVariableType havt; + col.apply(havt); + val = havt.variable; +} + +class SiProgramMapMerger : public IHaveSubTasks { + public: + SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + commonObjects(co), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec))); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + auto db = commonObjects->dataSource<RdbmsDataSource>("postgres")->getReadonly(); + SelectPtr sel = SelectPtr(db->newSelectCommand("select d.frequency, s.programid \ + from delivery_dvbt d, services s \ + where d.transportstreamid = s.transportstreamid \ + and s.programid is not null \ + order by s.transportstreamid, s.serviceid")); + int64_t curFreq = 0; + P2PVR::TunerPrx tuner; + while (sel->fetch()) { + int64_t freq, pid; + freq << (*sel)[0]; + pid << (*sel)[1]; + + if (freq != curFreq) { + if (tuner) { + devs->ReleaseTuner(tuner); + } + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + const auto transport = *std::find_if(deliveries.begin(), deliveries.end(), + [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; }); + tuner = devs->GetTunerSpecific(transport); + curFreq = freq; + } + + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramMap(pid, parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + } + if (tuner) { + devs->ReleaseTuner(tuner); + } + } + + private: + CommonObjects * commonObjects; + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice) +{ + TxHelper tx(this); + SqlMergeTask mergeServiceStreams("postgres", "servicestreams"); + CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2)); + mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice)); + mergeServiceStreams.loadComplete(this); + mergeServiceStreams.execute(NULL); +} + diff --git a/p2pvr/daemon/maintenance/services.cpp b/p2pvr/daemon/maintenance/services.cpp new file mode 100644 index 0000000..55409c2 --- /dev/null +++ b/p2pvr/daemon/maintenance/services.cpp @@ -0,0 +1,68 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/service.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiServicesMerger : public SiServicesParser { + public: + SiServicesMerger(DatabaseClient * co) : commonObjects(co) { } + + bool HandleTable(DVBSI::TransportStreamPtr ts) + { + Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d", + s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1), + (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"), + s->RunningStatus, s->FreeCaMode); + } + + DatabaseClient::TxHelper tx(commonObjects); + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::ServicePtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services)); + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + return false; + } + + private: + DatabaseClient * commonObjects; +}; + +void +Maintenance::UpdateServices(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiServicesMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + auto delivery = si->GetDeliveryForSi(); + if (!delivery) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, delivery); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__); + tuner->SendServiceDescriptions(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); +} + |