summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2013-12-03 15:21:25 +0000
committerrandomdan <randomdan@localhost>2013-12-03 15:21:25 +0000
commitc48bac72d9b9fc8e177b161a67dbbb9f91e21fc2 (patch)
treee3c3e5bed636fa75aec2e853ae30689158df2d65
parentMonitor tuner usage and close them if they go idle for a period of time (diff)
downloadp2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.tar.bz2
p2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.tar.xz
p2pvr-c48bac72d9b9fc8e177b161a67dbbb9f91e21fc2.zip
Tidy maintenance code into separate files
Change UpdateTransports to be UpdateNetwork
-rw-r--r--p2pvr/daemon/daemon.cpp2
-rw-r--r--p2pvr/ice/commonHelpers.h1
-rw-r--r--p2pvr/ice/p2pvr.ice2
-rw-r--r--p2pvr/lib/maintenance.cpp457
-rw-r--r--p2pvr/lib/maintenance.h7
-rw-r--r--p2pvr/lib/maintenance/events.cpp88
-rw-r--r--p2pvr/lib/maintenance/network.cpp113
-rw-r--r--p2pvr/lib/maintenance/programAssociations.cpp80
-rw-r--r--p2pvr/lib/maintenance/programMap.cpp133
-rw-r--r--p2pvr/lib/maintenance/services.cpp68
-rw-r--r--p2pvr/lib/temporaryIceAdapterObject.h2
11 files changed, 497 insertions, 456 deletions
diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp
index 4f55685..91b8daa 100644
--- a/p2pvr/daemon/daemon.cpp
+++ b/p2pvr/daemon/daemon.cpp
@@ -33,7 +33,7 @@ class P2PvrDaemon : public Daemon {
adapter->activate();
auto maint = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(ic->stringToIdentity("Maintenance")));
- maint->UpdateTransports(FE_OFDM);
+ maint->UpdateNetwork(FE_OFDM);
maint->UpdateServices(FE_OFDM);
maint->UpdateProgramAssociations(FE_OFDM);
maint->UpdateProgramMaps(FE_OFDM);
diff --git a/p2pvr/ice/commonHelpers.h b/p2pvr/ice/commonHelpers.h
index b54523e..58caa3d 100644
--- a/p2pvr/ice/commonHelpers.h
+++ b/p2pvr/ice/commonHelpers.h
@@ -3,6 +3,7 @@
#include <p2pvr.h>
#include <ostream>
+#include <iomanip>
namespace Common {
template<typename C, typename T>
diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice
index bae39f8..9dac2a4 100644
--- a/p2pvr/ice/p2pvr.ice
+++ b/p2pvr/ice/p2pvr.ice
@@ -236,7 +236,7 @@ module P2PVR {
interface Maintenance {
idempotent void UpdateAll();
- idempotent void UpdateTransports(short type);
+ idempotent void UpdateNetwork(short type);
idempotent void UpdateServices(short type);
idempotent void UpdateProgramAssociations(short type);
idempotent void UpdateProgramMaps(short type);
diff --git a/p2pvr/lib/maintenance.cpp b/p2pvr/lib/maintenance.cpp
index 6fe44b2..540b323 100644
--- a/p2pvr/lib/maintenance.cpp
+++ b/p2pvr/lib/maintenance.cpp
@@ -1,27 +1,10 @@
#include "maintenance.h"
-#include "siParsers/network.h"
-#include "siParsers/event.h"
-#include "siParsers/programAssociation.h"
-#include "siParsers/programMap.h"
-#include "siParsers/service.h"
#include <Ice/Ice.h>
-#include <stdexcept>
-#include <iomanip>
-#include <logger.h>
-#include <sqlMergeTask.h>
-#include <commonObjects.h>
-#include <commonHelpers.h>
-#include "p2Helpers.h"
-#include "dvbsiHelpers.h"
-#include "containerIterator.h"
-#include "mapIterator.h"
-#include "singleIterator.h"
-#include "temporaryIceAdapterObject.h"
-
#include <linux/dvb/frontend.h>
+#include <sqlMergeTask.h>
void
-SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key)
+Maintenance::SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key)
{
merge->cols.insert(new SqlMergeTask::TargetColumn(name, key));
if (key) {
@@ -29,140 +12,6 @@ SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key
}
}
-class SiNetworkInformationMerger : public SiNetworkInformationParser {
- public:
- SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { }
-
- void HandleTable(DVBSI::NetworkPtr n)
- {
- Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name);
- BOOST_FOREACH(const auto & ts, n->TransportStreams) {
- Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId);
- BOOST_FOREACH(const auto & s, ts->Services) {
- Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType);
- }
- if (ts->Terrestrial) {
- Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency);
- }
- }
-
- SqlMergeTask mergeNetwork("postgres", "networks");
- CreateColumns<DVBSI::NetworkPtr>(boost::bind(SqlMergeColumnsInserter, &mergeNetwork, _1, _2));
- std::vector<DVBSI::NetworkPtr> networks = { n };
- mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks));
- mergeNetwork.loadComplete(commonObjects);
- mergeNetwork.execute(NULL);
-
- SqlMergeTask mergeTransports("postgres", "transportstreams");
- CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeTransports, _1, _2));
- mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams, n));
- mergeTransports.loadComplete(commonObjects);
- mergeTransports.execute(NULL);
-
- SqlMergeTask mergeDvbt("postgres", "delivery_dvbt");
- CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(SqlMergeColumnsInserter, &mergeDvbt, _1, _2));
- BOOST_FOREACH(const auto & s, n->TransportStreams) {
- if (s->Terrestrial) {
- mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial, s));
- }
- }
- mergeDvbt.loadComplete(commonObjects);
- mergeDvbt.execute(NULL);
-
- SqlMergeTask mergeServices("postgres", "services");
- CreateColumns<DVBSI::NetworkService>(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
- BOOST_FOREACH(const auto & s, n->TransportStreams) {
- mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services, s));
- }
- mergeServices.loadComplete(commonObjects);
- mergeServices.execute(NULL);
- }
- private:
- CommonObjects * commonObjects;
-};
-
-class SiProgramMapHandler : public SiProgramMapParser {
- public:
- SiProgramMapHandler(const RowProcessorCallback & cb) :
- callBack(cb) {}
-
- void HandleTable(DVBSI::ProgramMapPtr pmp)
- {
- Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId);
- BOOST_FOREACH(const auto & s, pmp->Streams) {
- Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id);
- BindColumns<DVBSI::StreamPtr>(rowState, s, pmp);
- rowState.process(callBack);
- }
- }
-
- private:
- ObjectRowState<DVBSI::StreamPtr> rowState;
- const RowProcessorCallback callBack;
-};
-
-class SiProgramAssociationHandler : public SiProgramAssociationParser {
- public:
- void HandleTable(ProgramAssociationMapPtr pam)
- {
- Logger()->messagebf(LOG_DEBUG, "Program association table");
- BOOST_FOREACH(const auto & pa, *pam) {
- Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second);
- }
- BOOST_FOREACH(const auto & pa, *pam) {
- map[pa.first] = pa.second;
- }
- }
-
- ProgramAssociationMap map;
-};
-
-class SiServicesMerger : public SiServicesParser {
- public:
- SiServicesMerger(CommonObjects * co) : commonObjects(co) { }
-
- void HandleTable(DVBSI::TransportStreamPtr ts)
- {
- Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId);
- BOOST_FOREACH(const auto & s, ts->Services) {
- Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d",
- s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1),
- (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"),
- s->RunningStatus, s->FreeCaMode);
- }
-
- SqlMergeTask mergeServices("postgres", "services");
- CreateColumns<DVBSI::ServicePtr>(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
- // Don't change the list of services available from the network
- mergeServices.doDelete = VariableType(false);
- mergeServices.doInsert = VariableType(false);
- mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services, ts));
- mergeServices.loadComplete(commonObjects);
- mergeServices.execute(NULL);
- }
-
- private:
- CommonObjects * commonObjects;
-};
-
-class SiEventsHandler : public SiEpgParser {
- public:
- SiEventsHandler(const RowProcessorCallback & cb) :
- callBack(cb) {}
-
- void HandleTable(DVBSI::EventPtr e)
- {
- Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s",
- e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime);
- BindColumns<DVBSI::EventPtr>(rowState, e);
- rowState.process(callBack);
- }
-
- private:
- ObjectRowState<DVBSI::EventPtr> rowState;
- const RowProcessorCallback callBack;
-};
-
void
Maintenance::UpdateAll(const Ice::Current & ice)
{
@@ -175,306 +24,10 @@ Maintenance::UpdateAll(const Ice::Current & ice)
void
Maintenance::UpdateAll(short type, const Ice::Current & ice)
{
- UpdateTransports(type, ice);
+ UpdateNetwork(type, ice);
UpdateServices(type, ice);
+ UpdateProgramAssociations(type, ice);
+ UpdateProgramMaps(type, ice);
UpdateEvents(type, ice);
}
-void
-Maintenance::UpdateTransports(short type, const Ice::Current & ice)
-{
- auto ic = ice.adapter->getCommunicator();
- UpdateTransports(type, ic, ice.adapter);
-}
-
-void
-Maintenance::UpdateTransports(short type, Ice::CommunicatorPtr ic, Ice::ObjectAdapterPtr adapter)
-{
- auto devs = P2PVR::DevicesPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
- auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("SI")));
- auto siparser = new SiNetworkInformationMerger(this);
- TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(adapter, siparser);
-
- if (!devs) {
- throw std::runtime_error("bad proxy(s)");
- }
- const auto transports = si->GetAllDeliveries(FE_OFDM);
- // Attempt to just download fresh data
- BOOST_FOREACH(const auto & transport, transports) {
- P2PVR::TunerPrx tuner;
- try {
- tuner = devs->GetTunerAny(type, transport, time(NULL) + 300);
- }
- catch (...) {
- Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__);
- continue;
- }
- if (!tuner) {
- continue;
- }
- try {
- tuner->SendNetworkInformation(parser);
- devs->ReleaseTuner(tuner);
- BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
- ds.second->commit();
- }
- return;
- }
- catch (const std::exception & ex) {
- Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what());
- devs->ReleaseTuner(tuner);
- throw;
- }
- catch (...) {
- Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__);
- devs->ReleaseTuner(tuner);
- throw;
- }
- }
- // If we can't do that, do a complete scan
- auto tuner = devs->GetPrivateTuner(type, time(NULL) + 300);
- tuner->ScanAndSendNetworkInformation(parser);
- devs->ReleaseTuner(tuner);
-}
-
-void
-Maintenance::UpdateServices(short type, const Ice::Current & ice)
-{
- auto ic = ice.adapter->getCommunicator();
- auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
- auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
-
- if (!devs || !si) {
- throw std::runtime_error("bad proxy(s)");
- }
-
- auto siparser = new SiServicesMerger(this);
- TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
-
- const auto deliveries = si->GetAllDeliveries(type);
- if (deliveries.empty()) {
- throw std::runtime_error("no delivery methods");
- }
-
- Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
- auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 90);
- Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__);
- tuner->SendServiceDescriptions(parser);
- Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__);
- devs->ReleaseTuner(tuner);
- BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
- ds.second->commit();
- }
-}
-
-void
-CreatePMTColumns(const ColumnCreator & cc)
-{
- cc("serviceId", true);
- cc("programId", false);
-}
-
-void
-Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice)
-{
- auto ic = ice.adapter->getCommunicator();
- auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
- auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
-
- if (!devs || !si) {
- throw std::runtime_error("bad proxy(s)");
- }
-
- auto siparser = new SiProgramAssociationHandler();
- TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
-
- const auto deliveries = si->GetAllDeliveries(type);
- if (deliveries.empty()) {
- throw std::runtime_error("no delivery methods");
- }
-
- BOOST_FOREACH(const auto & transport, deliveries) {
- try {
- Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
- auto tuner = devs->GetTunerSpecific(transport, time(NULL) + 300);
- Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
- tuner->SendProgramAssociationTable(parser);
- Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
- devs->ReleaseTuner(tuner);
- }
- catch (...) {
- // Tuning can fail
- }
- }
-
- SqlMergeTask mergeServices("postgres", "services");
- CreatePMTColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
- // Don't change the list of services available from the network
- mergeServices.doDelete = VariableType(false);
- mergeServices.doInsert = VariableType(false);
- Columns cols;
- mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePMTColumns, &siparser->map));
- mergeServices.loadComplete(this);
- mergeServices.execute(NULL);
- BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
- ds.second->commit();
- }
-}
-
-#include <rdbmsDataSource.h>
-#include <column.h>
-#include <selectcommand.h>
-typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
-class HandleAsInt : public DB::HandleField {
- public:
- virtual void null() { }
- virtual void string(const char *, size_t ) { }
- virtual void integer(int64_t v) { integerValue = v; }
- virtual void floatingpoint(double) { }
- virtual void timestamp(const struct tm &) { }
- int64_t integerValue;
-};
-class SiProgramMapMerger : public IHaveSubTasks {
- public:
- SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) :
- SourceObject(__PRETTY_FUNCTION__),
- IHaveSubTasks(NULL),
- commonObjects(co),
- type(t),
- ice(i) { }
-
- void execute(ExecContext * ec) const
- {
- auto ic = ice.adapter->getCommunicator();
- auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
- auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
-
- if (!devs || !si) {
- throw std::runtime_error("bad proxy(s)");
- }
-
- TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
- new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec)));
-
- const auto deliveries = si->GetAllDeliveries(type);
- if (deliveries.empty()) {
- throw std::runtime_error("no delivery methods");
- }
-
- auto db = commonObjects->dataSource<RdbmsDataSource>("postgres");
- SelectPtr sel = SelectPtr(db->getReadonly().newSelectCommand("select d.frequency, s.programid \
- from delivery_dvbt d, services s \
- where d.transportstreamid = s.transportstreamid \
- and s.programid is not null \
- order by s.transportstreamid, s.serviceid"));
- int64_t curFreq = 0;
- P2PVR::TunerPrx tuner;
- while (sel->fetch()) {
- HandleAsInt freq, pid;
- (*sel)[0].apply(freq);
- (*sel)[1].apply(pid);
-
- if (freq.integerValue != curFreq) {
- if (tuner) {
- devs->ReleaseTuner(tuner);
- }
- Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
- const auto transport = *std::find_if(deliveries.begin(), deliveries.end(),
- [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq.integerValue; });
- tuner = devs->GetTunerSpecific(transport, time(NULL) + 10);
- curFreq = freq.integerValue;
- }
-
- Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
- tuner->SendProgramMap(pid.integerValue, parser);
- Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
- }
- if (tuner) {
- devs->ReleaseTuner(tuner);
- }
- }
-
- private:
- CommonObjects * commonObjects;
- const short type;
- const Ice::Current & ice;
-
- void executeChildren(ExecContext * ec) const
- {
- BOOST_FOREACH(const Tasks::value_type & sq, normal) {
- sq->execute(ec);
- }
- }
-};
-
-void
-Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice)
-{
- SqlMergeTask mergeServiceStreams("postgres", "servicestreams");
- CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2));
- mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice));
- mergeServiceStreams.loadComplete(this);
- mergeServiceStreams.execute(NULL);
- BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
- ds.second->commit();
- }
-}
-
-class SiEventsMerger : public IHaveSubTasks {
- public:
- SiEventsMerger(short t, const Ice::Current & i) :
- SourceObject(__PRETTY_FUNCTION__),
- IHaveSubTasks(NULL),
- type(t),
- ice(i) { }
-
- void execute(ExecContext * ec) const
- {
- auto ic = ice.adapter->getCommunicator();
- auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
- auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
-
- if (!devs || !si) {
- throw std::runtime_error("bad proxy(s)");
- }
-
- TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
- new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec)));
-
- const auto deliveries = si->GetAllDeliveries(type);
- if (deliveries.empty()) {
- throw std::runtime_error("no delivery methods");
- }
-
- Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
- auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 600);
- Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__);
- tuner->SendEventInformation(parser);
- devs->ReleaseTuner(tuner);
- }
-
- private:
- const short type;
- const Ice::Current & ice;
-
- void executeChildren(ExecContext * ec) const
- {
- BOOST_FOREACH(const Tasks::value_type & sq, normal) {
- sq->execute(ec);
- }
- }
-};
-
-void
-Maintenance::UpdateEvents(short type, const Ice::Current & ice)
-{
- SqlMergeTask mergeEvents("postgres", "events");
- CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2));
- mergeEvents.sources.insert(new SiEventsMerger(type, ice));
- mergeEvents.loadComplete(this);
- mergeEvents.execute(NULL);
- BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
- ds.second->commit();
- }
- Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__);
-}
diff --git a/p2pvr/lib/maintenance.h b/p2pvr/lib/maintenance.h
index acffb72..b1965d1 100644
--- a/p2pvr/lib/maintenance.h
+++ b/p2pvr/lib/maintenance.h
@@ -4,16 +4,19 @@
#include <p2pvr.h>
#include <commonObjects.h>
+class SqlMergeTask;
+
class Maintenance : public P2PVR::Maintenance, public virtual CommonObjects {
public:
void UpdateAll(const Ice::Current &);
void UpdateAll(short type, const Ice::Current &);
- void UpdateTransports(short type, const Ice::Current &);
- void UpdateTransports(short type, Ice::CommunicatorPtr, Ice::ObjectAdapterPtr);
+ void UpdateNetwork(short type, const Ice::Current &);
void UpdateServices(short type, const Ice::Current &);
void UpdateProgramAssociations(short type, const Ice::Current &);
void UpdateProgramMaps(short type, const Ice::Current &);
void UpdateEvents(short type, const Ice::Current &);
+
+ static void SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key);
};
#endif
diff --git a/p2pvr/lib/maintenance/events.cpp b/p2pvr/lib/maintenance/events.cpp
new file mode 100644
index 0000000..7edac72
--- /dev/null
+++ b/p2pvr/lib/maintenance/events.cpp
@@ -0,0 +1,88 @@
+#include "../maintenance.h"
+#include "../siParsers/event.h"
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include "../p2Helpers.h"
+#include "../dvbsiHelpers.h"
+#include "../containerIterator.h"
+#include "../singleIterator.h"
+#include "../temporaryIceAdapterObject.h"
+#include <commonHelpers.h>
+
+class SiEventsHandler : public SiEpgParser {
+ public:
+ SiEventsHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ void HandleTable(DVBSI::EventPtr e)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s",
+ e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime);
+ BindColumns<DVBSI::EventPtr>(rowState, e);
+ rowState.process(callBack);
+ }
+
+ private:
+ ObjectRowState<DVBSI::EventPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+class SiEventsMerger : public IHaveSubTasks {
+ public:
+ SiEventsMerger(short t, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec)));
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 600);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__);
+ tuner->SendEventInformation(parser);
+ devs->ReleaseTuner(tuner);
+ }
+
+ private:
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateEvents(short type, const Ice::Current & ice)
+{
+ SqlMergeTask mergeEvents("postgres", "events");
+ CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2));
+ mergeEvents.sources.insert(new SiEventsMerger(type, ice));
+ mergeEvents.loadComplete(this);
+ mergeEvents.execute(NULL);
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ ds.second->commit();
+ }
+ Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__);
+}
+
diff --git a/p2pvr/lib/maintenance/network.cpp b/p2pvr/lib/maintenance/network.cpp
new file mode 100644
index 0000000..6b0ebc6
--- /dev/null
+++ b/p2pvr/lib/maintenance/network.cpp
@@ -0,0 +1,113 @@
+#include "../maintenance.h"
+#include "../siParsers/network.h"
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include "../p2Helpers.h"
+#include "../dvbsiHelpers.h"
+#include "../containerIterator.h"
+#include "../singleIterator.h"
+#include "../temporaryIceAdapterObject.h"
+
+class SiNetworkInformationMerger : public SiNetworkInformationParser {
+ public:
+ SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { }
+
+ void HandleTable(DVBSI::NetworkPtr n)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name);
+ BOOST_FOREACH(const auto & ts, n->TransportStreams) {
+ Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType);
+ }
+ if (ts->Terrestrial) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency);
+ }
+ }
+
+ SqlMergeTask mergeNetwork("postgres", "networks");
+ CreateColumns<DVBSI::NetworkPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeNetwork, _1, _2));
+ std::vector<DVBSI::NetworkPtr> networks = { n };
+ mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks));
+ mergeNetwork.loadComplete(commonObjects);
+ mergeNetwork.execute(NULL);
+
+ SqlMergeTask mergeTransports("postgres", "transportstreams");
+ CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeTransports, _1, _2));
+ mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams, n));
+ mergeTransports.loadComplete(commonObjects);
+ mergeTransports.execute(NULL);
+
+ SqlMergeTask mergeDvbt("postgres", "delivery_dvbt");
+ CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeDvbt, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ if (s->Terrestrial) {
+ mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial, s));
+ }
+ }
+ mergeDvbt.loadComplete(commonObjects);
+ mergeDvbt.execute(NULL);
+
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::NetworkService>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services, s));
+ }
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ }
+ private:
+ CommonObjects * commonObjects;
+};
+
+void
+Maintenance::UpdateNetwork(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+ auto siparser = new SiNetworkInformationMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ if (!devs) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+ const auto transports = si->GetAllDeliveries(type);
+ // Attempt to just download fresh data
+ BOOST_FOREACH(const auto & transport, transports) {
+ P2PVR::TunerPrx tuner;
+ try {
+ tuner = devs->GetTunerAny(type, transport, time(NULL) + 300);
+ }
+ catch (...) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__);
+ continue;
+ }
+ if (!tuner) {
+ continue;
+ }
+ try {
+ tuner->SendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ ds.second->commit();
+ }
+ return;
+ }
+ catch (const std::exception & ex) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what());
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ catch (...) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ }
+ // If we can't do that, do a complete scan
+ auto tuner = devs->GetPrivateTuner(type, time(NULL) + 300);
+ tuner->ScanAndSendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+}
+
diff --git a/p2pvr/lib/maintenance/programAssociations.cpp b/p2pvr/lib/maintenance/programAssociations.cpp
new file mode 100644
index 0000000..4d32a86
--- /dev/null
+++ b/p2pvr/lib/maintenance/programAssociations.cpp
@@ -0,0 +1,80 @@
+#include "../maintenance.h"
+#include "../siParsers/programAssociation.h"
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include "../p2Helpers.h"
+#include "../dvbsiHelpers.h"
+#include "../mapIterator.h"
+#include "../temporaryIceAdapterObject.h"
+
+class SiProgramAssociationHandler : public SiProgramAssociationParser {
+ public:
+ void HandleTable(ProgramAssociationMapPtr pam)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program association table");
+ BOOST_FOREACH(const auto & pa, *pam) {
+ Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second);
+ }
+ BOOST_FOREACH(const auto & pa, *pam) {
+ map[pa.first] = pa.second;
+ }
+ }
+
+ ProgramAssociationMap map;
+};
+
+static
+void
+CreatePATColumns(const ColumnCreator & cc)
+{
+ cc("serviceId", true);
+ cc("programId", false);
+}
+
+void
+Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiProgramAssociationHandler();
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ BOOST_FOREACH(const auto & transport, deliveries) {
+ try {
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerSpecific(transport, time(NULL) + 300);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramAssociationTable(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ }
+ catch (...) {
+ // Tuning can fail
+ }
+ }
+
+ SqlMergeTask mergeServices("postgres", "services");
+ CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ Columns cols;
+ mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map));
+ mergeServices.loadComplete(this);
+ mergeServices.execute(NULL);
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ ds.second->commit();
+ }
+}
+
diff --git a/p2pvr/lib/maintenance/programMap.cpp b/p2pvr/lib/maintenance/programMap.cpp
new file mode 100644
index 0000000..f230f77
--- /dev/null
+++ b/p2pvr/lib/maintenance/programMap.cpp
@@ -0,0 +1,133 @@
+#include "../maintenance.h"
+#include "../siParsers/programMap.h"
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include "../p2Helpers.h"
+#include "../dvbsiHelpers.h"
+#include "../containerIterator.h"
+#include "../singleIterator.h"
+#include "../temporaryIceAdapterObject.h"
+#include <rdbmsDataSource.h>
+#include <column.h>
+#include <selectcommand.h>
+#include <sqlHandleAsVariableType.h>
+
+class SiProgramMapHandler : public SiProgramMapParser {
+ public:
+ SiProgramMapHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ void HandleTable(DVBSI::ProgramMapPtr pmp)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId);
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id);
+ }
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ BindColumns<DVBSI::StreamPtr>(rowState, s, pmp);
+ rowState.process(callBack);
+ }
+ }
+
+ private:
+ ObjectRowState<DVBSI::StreamPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+
+template<typename T>
+void
+operator<<(T & val, const DB::Column & col)
+{
+ HandleAsVariableType havt;
+ col.apply(havt);
+ val = havt.variable;
+}
+
+class SiProgramMapMerger : public IHaveSubTasks {
+ public:
+ SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ commonObjects(co),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec)));
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ auto db = commonObjects->dataSource<RdbmsDataSource>("postgres");
+ SelectPtr sel = SelectPtr(db->getReadonly().newSelectCommand("select d.frequency, s.programid \
+ from delivery_dvbt d, services s \
+ where d.transportstreamid = s.transportstreamid \
+ and s.programid is not null \
+ order by s.transportstreamid, s.serviceid"));
+ int64_t curFreq = 0;
+ P2PVR::TunerPrx tuner;
+ while (sel->fetch()) {
+ int64_t freq, pid;
+ freq << (*sel)[0];
+ pid << (*sel)[1];
+
+ if (freq != curFreq) {
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ const auto transport = *std::find_if(deliveries.begin(), deliveries.end(),
+ [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; });
+ tuner = devs->GetTunerSpecific(transport, time(NULL) + 10);
+ curFreq = freq;
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramMap(pid, parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ }
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ }
+
+ private:
+ CommonObjects * commonObjects;
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice)
+{
+ SqlMergeTask mergeServiceStreams("postgres", "servicestreams");
+ CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2));
+ mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice));
+ mergeServiceStreams.loadComplete(this);
+ mergeServiceStreams.execute(NULL);
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ ds.second->commit();
+ }
+}
+
diff --git a/p2pvr/lib/maintenance/services.cpp b/p2pvr/lib/maintenance/services.cpp
new file mode 100644
index 0000000..30df800
--- /dev/null
+++ b/p2pvr/lib/maintenance/services.cpp
@@ -0,0 +1,68 @@
+#include "../maintenance.h"
+#include "../siParsers/service.h"
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include "../p2Helpers.h"
+#include "../dvbsiHelpers.h"
+#include "../containerIterator.h"
+#include "../singleIterator.h"
+#include "../temporaryIceAdapterObject.h"
+
+class SiServicesMerger : public SiServicesParser {
+ public:
+ SiServicesMerger(CommonObjects * co) : commonObjects(co) { }
+
+ void HandleTable(DVBSI::TransportStreamPtr ts)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d",
+ s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1),
+ (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"),
+ s->RunningStatus, s->FreeCaMode);
+ }
+
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::ServicePtr>(boost::bind(&Maintenance::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services, ts));
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ }
+
+ private:
+ CommonObjects * commonObjects;
+};
+
+void
+Maintenance::UpdateServices(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiServicesMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, deliveries.front(), time(NULL) + 90);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__);
+ tuner->SendServiceDescriptions(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ ds.second->commit();
+ }
+}
+
diff --git a/p2pvr/lib/temporaryIceAdapterObject.h b/p2pvr/lib/temporaryIceAdapterObject.h
index db4b340..e8b83d8 100644
--- a/p2pvr/lib/temporaryIceAdapterObject.h
+++ b/p2pvr/lib/temporaryIceAdapterObject.h
@@ -1,6 +1,8 @@
#ifndef TEMPORARYICEADAPTER_H
#define TEMPORARYICEADAPTER_H
+#include <Ice/ObjectAdapter.h>
+
template <typename Object>
class TemporarayIceAdapterObject {
public: