summaryrefslogtreecommitdiff
path: root/p2pvr/daemon/maintenance
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-03-13 19:42:07 +0000
committerrandomdan <randomdan@localhost>2014-03-13 19:42:07 +0000
commitab1eee942e75874739ce5f0b4ba289aac5cc3faf (patch)
tree6e43828794fe0c0c5c9921ec1911695b67357c50 /p2pvr/daemon/maintenance
parentExpose more of the interface (diff)
downloadp2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.bz2
p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.xz
p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.zip
Restructure into more sensibly arranged libs
Diffstat (limited to 'p2pvr/daemon/maintenance')
-rw-r--r--p2pvr/daemon/maintenance/events.cpp92
-rw-r--r--p2pvr/daemon/maintenance/network.cpp107
-rw-r--r--p2pvr/daemon/maintenance/programAssociations.cpp80
-rw-r--r--p2pvr/daemon/maintenance/programMap.cpp133
-rw-r--r--p2pvr/daemon/maintenance/services.cpp68
5 files changed, 480 insertions, 0 deletions
diff --git a/p2pvr/daemon/maintenance/events.cpp b/p2pvr/daemon/maintenance/events.cpp
new file mode 100644
index 0000000..b0c8b74
--- /dev/null
+++ b/p2pvr/daemon/maintenance/events.cpp
@@ -0,0 +1,92 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/event.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+#include <commonHelpers.h>
+
+class SiEventsHandler : public SiEpgParser {
+ public:
+ SiEventsHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ void HandleTable(DVBSI::EventPtr e)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s",
+ e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime);
+ BindColumns<DVBSI::EventPtr>(rowState, e);
+ rowState.process(callBack);
+ }
+
+ private:
+ ObjectRowState<DVBSI::EventPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+class SiEventsMerger : public IHaveSubTasks {
+ public:
+ SiEventsMerger(short t, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec)));
+
+ auto delivery = si->GetDeliveryForSi();
+ if (!delivery) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, delivery);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__);
+ tuner->SendEventInformation(parser);
+ devs->ReleaseTuner(tuner);
+ }
+
+ private:
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateEvents(short type, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ SqlMergeTask mergeEvents("postgres", "events");
+ CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2));
+ mergeEvents.sources.insert(new SiEventsMerger(type, ice));
+ mergeEvents.loadComplete(this);
+ mergeEvents.execute(NULL);
+ tx.Commit();
+ Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__);
+
+ auto ic = ice.adapter->getCommunicator();
+ auto sch = P2PVR::SchedulesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("Schedules")));
+ sch->DoReschedule();
+}
+
diff --git a/p2pvr/daemon/maintenance/network.cpp b/p2pvr/daemon/maintenance/network.cpp
new file mode 100644
index 0000000..d9ce0ec
--- /dev/null
+++ b/p2pvr/daemon/maintenance/network.cpp
@@ -0,0 +1,107 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/network.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiNetworkInformationMerger : public SiNetworkInformationParser {
+ public:
+ SiNetworkInformationMerger(DatabaseClient * co) : commonObjects(co) { }
+
+ bool HandleTable(DVBSI::NetworkPtr n)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name);
+ BOOST_FOREACH(const auto & ts, n->TransportStreams) {
+ Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType);
+ }
+ if (ts->Terrestrial) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency);
+ }
+ }
+
+ DatabaseClient::TxHelper tx(commonObjects);
+ SqlMergeTask mergeNetwork("postgres", "networks");
+ CreateColumns<DVBSI::NetworkPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeNetwork, _1, _2));
+ std::vector<DVBSI::NetworkPtr> networks = { n };
+ mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks));
+ mergeNetwork.loadComplete(commonObjects);
+ mergeNetwork.execute(NULL);
+
+ SqlMergeTask mergeTransports("postgres", "transportstreams");
+ CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeTransports, _1, _2));
+ mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams));
+ mergeTransports.loadComplete(commonObjects);
+ mergeTransports.execute(NULL);
+
+ SqlMergeTask mergeDvbt("postgres", "delivery_dvbt");
+ CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeDvbt, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ if (s->Terrestrial) {
+ mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial));
+ }
+ }
+ mergeDvbt.loadComplete(commonObjects);
+ mergeDvbt.execute(NULL);
+
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::NetworkService>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services));
+ }
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ return false;
+ }
+ private:
+ DatabaseClient * commonObjects;
+};
+
+void
+Maintenance::UpdateNetwork(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+ auto siparser = new SiNetworkInformationMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ if (!devs) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+ auto transport = si->GetDeliveryForSi();
+ if (transport) {
+ P2PVR::TunerPrx tuner;
+ try {
+ tuner = devs->GetTunerAny(type, transport);
+ tuner->SendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+ return;
+ }
+ catch (const P2PVR::NoSuitableDeviceAvailable &) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__);
+ throw;
+ }
+ catch (const std::exception & ex) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what());
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ catch (...) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ }
+ // If we can't do that, do a complete scan
+ auto tuner = devs->GetPrivateTuner(type);
+ tuner->ScanAndSendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+}
+
diff --git a/p2pvr/daemon/maintenance/programAssociations.cpp b/p2pvr/daemon/maintenance/programAssociations.cpp
new file mode 100644
index 0000000..ad6438c
--- /dev/null
+++ b/p2pvr/daemon/maintenance/programAssociations.cpp
@@ -0,0 +1,80 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/programAssociation.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <mapIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiProgramAssociationHandler : public SiProgramAssociationParser {
+ public:
+ bool HandleTable(ProgramAssociationMapPtr pam)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program association table");
+ BOOST_FOREACH(const auto & pa, *pam) {
+ Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second);
+ }
+ BOOST_FOREACH(const auto & pa, *pam) {
+ map[pa.first] = pa.second;
+ }
+ return false;
+ }
+
+ ProgramAssociationMap map;
+};
+
+static
+void
+CreatePATColumns(const ColumnCreator & cc)
+{
+ cc("serviceId", true);
+ cc("programId", false);
+}
+
+void
+Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiProgramAssociationHandler();
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ BOOST_FOREACH(const auto & transport, deliveries) {
+ try {
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerSpecific(transport);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramAssociationTable(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ }
+ catch (...) {
+ // Tuning can fail
+ }
+ }
+
+ TxHelper tx(this);
+ SqlMergeTask mergeServices("postgres", "services");
+ CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ Columns cols;
+ mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map));
+ mergeServices.loadComplete(this);
+ mergeServices.execute(NULL);
+}
+
diff --git a/p2pvr/daemon/maintenance/programMap.cpp b/p2pvr/daemon/maintenance/programMap.cpp
new file mode 100644
index 0000000..94d7752
--- /dev/null
+++ b/p2pvr/daemon/maintenance/programMap.cpp
@@ -0,0 +1,133 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/programMap.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+#include <rdbmsDataSource.h>
+#include <column.h>
+#include <selectcommand.h>
+#include <sqlHandleAsVariableType.h>
+
+class SiProgramMapHandler : public SiProgramMapParser {
+ public:
+ SiProgramMapHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ bool HandleTable(DVBSI::ProgramMapPtr pmp)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId);
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id);
+ }
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ BindColumns<DVBSI::StreamPtr>(rowState, s);
+ rowState.process(callBack);
+ }
+ return false;
+ }
+
+ private:
+ ObjectRowState<DVBSI::StreamPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+
+template<typename T>
+void
+operator<<(T & val, const DB::Column & col)
+{
+ HandleAsVariableType havt;
+ col.apply(havt);
+ val = havt.variable;
+}
+
+class SiProgramMapMerger : public IHaveSubTasks {
+ public:
+ SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ commonObjects(co),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec)));
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ auto db = commonObjects->dataSource<RdbmsDataSource>("postgres")->getReadonly();
+ SelectPtr sel = SelectPtr(db->newSelectCommand("select d.frequency, s.programid \
+ from delivery_dvbt d, services s \
+ where d.transportstreamid = s.transportstreamid \
+ and s.programid is not null \
+ order by s.transportstreamid, s.serviceid"));
+ int64_t curFreq = 0;
+ P2PVR::TunerPrx tuner;
+ while (sel->fetch()) {
+ int64_t freq, pid;
+ freq << (*sel)[0];
+ pid << (*sel)[1];
+
+ if (freq != curFreq) {
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ const auto transport = *std::find_if(deliveries.begin(), deliveries.end(),
+ [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; });
+ tuner = devs->GetTunerSpecific(transport);
+ curFreq = freq;
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramMap(pid, parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ }
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ }
+
+ private:
+ CommonObjects * commonObjects;
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ SqlMergeTask mergeServiceStreams("postgres", "servicestreams");
+ CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2));
+ mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice));
+ mergeServiceStreams.loadComplete(this);
+ mergeServiceStreams.execute(NULL);
+}
+
diff --git a/p2pvr/daemon/maintenance/services.cpp b/p2pvr/daemon/maintenance/services.cpp
new file mode 100644
index 0000000..55409c2
--- /dev/null
+++ b/p2pvr/daemon/maintenance/services.cpp
@@ -0,0 +1,68 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/service.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiServicesMerger : public SiServicesParser {
+ public:
+ SiServicesMerger(DatabaseClient * co) : commonObjects(co) { }
+
+ bool HandleTable(DVBSI::TransportStreamPtr ts)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d",
+ s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1),
+ (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"),
+ s->RunningStatus, s->FreeCaMode);
+ }
+
+ DatabaseClient::TxHelper tx(commonObjects);
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::ServicePtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services));
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ return false;
+ }
+
+ private:
+ DatabaseClient * commonObjects;
+};
+
+void
+Maintenance::UpdateServices(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiServicesMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ auto delivery = si->GetDeliveryForSi();
+ if (!delivery) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, delivery);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__);
+ tuner->SendServiceDescriptions(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+}
+