summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--p2pvr/daemon/daemon.cpp2
-rw-r--r--p2pvr/ice/p2pvr.ice3
-rw-r--r--p2pvr/lib/Jamfile.jam1
-rw-r--r--p2pvr/lib/bindSiParserHandler.h25
-rw-r--r--p2pvr/lib/fileSink.cpp21
-rw-r--r--p2pvr/lib/fileSink.h20
-rw-r--r--p2pvr/lib/maintenance/network.cpp3
-rw-r--r--p2pvr/lib/maintenance/programAssociations.cpp3
-rw-r--r--p2pvr/lib/maintenance/programMap.cpp3
-rw-r--r--p2pvr/lib/maintenance/services.cpp3
-rw-r--r--p2pvr/lib/serviceStreamer.cpp79
-rw-r--r--p2pvr/lib/serviceStreamer.h42
-rw-r--r--p2pvr/lib/si.cpp68
-rw-r--r--p2pvr/lib/si.h13
-rw-r--r--p2pvr/lib/siParsers/event.cpp3
-rw-r--r--p2pvr/lib/siParsers/event.h2
-rw-r--r--p2pvr/lib/siParsers/table.h15
17 files changed, 282 insertions, 24 deletions
diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp
index 91b8daa..1bd328c 100644
--- a/p2pvr/daemon/daemon.cpp
+++ b/p2pvr/daemon/daemon.cpp
@@ -35,8 +35,6 @@ class P2PvrDaemon : public Daemon {
auto maint = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(ic->stringToIdentity("Maintenance")));
maint->UpdateNetwork(FE_OFDM);
maint->UpdateServices(FE_OFDM);
- maint->UpdateProgramAssociations(FE_OFDM);
- maint->UpdateProgramMaps(FE_OFDM);
maint->UpdateEvents(FE_OFDM);
//ic->waitForShutdown();
diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice
index b5903d3..8835511 100644
--- a/p2pvr/ice/p2pvr.ice
+++ b/p2pvr/ice/p2pvr.ice
@@ -265,8 +265,11 @@ module P2PVR {
interface SI {
// Get any delivery suitable for SI reading
idempotent Deliveries GetAllDeliveries(short type);
+ idempotent DVBSI::Delivery GetDeliveryForService(int id);
+ idempotent DVBSI::Delivery GetDeliveryForTransport(int id);
// Get a list of services
idempotent DVBSI::ServiceList GetServices();
+ idempotent DVBSI::Service GetService(int id);
};
};
diff --git a/p2pvr/lib/Jamfile.jam b/p2pvr/lib/Jamfile.jam
index b05e572..9814478 100644
--- a/p2pvr/lib/Jamfile.jam
+++ b/p2pvr/lib/Jamfile.jam
@@ -14,5 +14,6 @@ lib p2pvrlib :
: :
<library>boost_filesystem
<implicit-dependency>../ice//p2pvrice
+ <library>boost_system
<include>.
;
diff --git a/p2pvr/lib/bindSiParserHandler.h b/p2pvr/lib/bindSiParserHandler.h
new file mode 100644
index 0000000..a4813c2
--- /dev/null
+++ b/p2pvr/lib/bindSiParserHandler.h
@@ -0,0 +1,25 @@
+#ifndef BINDSIPARSERHANDLER_H
+#define BINDSIPARSERHANDLER_H
+
+#include <boost/function.hpp>
+
+template <typename SIObject, typename Base>
+class BindSiParserHandler : public Base {
+ public:
+ typedef boost::function<bool(SIObject)> Callback;
+ BindSiParserHandler(const Callback & cb) :
+ callBack(cb)
+ {
+ }
+
+ bool HandleTable(SIObject siObject)
+ {
+ return callBack(siObject);
+ }
+
+ private:
+ const Callback callBack;
+};
+
+#endif
+
diff --git a/p2pvr/lib/fileSink.cpp b/p2pvr/lib/fileSink.cpp
new file mode 100644
index 0000000..2fd4a58
--- /dev/null
+++ b/p2pvr/lib/fileSink.cpp
@@ -0,0 +1,21 @@
+#include "fileSink.h"
+
+FileSink::FileSink(const boost::filesystem::path & path) :
+ file(fopen(path.string().c_str(), "w"))
+{
+}
+
+FileSink::~FileSink()
+{
+ if (file) {
+ fclose(file);
+ }
+}
+
+bool
+FileSink::NewData(const P2PVR::Data & data, const Ice::Current &)
+{
+ fwrite(&data.front(), data.size(), 1, file);
+ return false;
+}
+
diff --git a/p2pvr/lib/fileSink.h b/p2pvr/lib/fileSink.h
new file mode 100644
index 0000000..a89016c
--- /dev/null
+++ b/p2pvr/lib/fileSink.h
@@ -0,0 +1,20 @@
+#ifndef FILESINK_H
+#define FILESINK_H
+
+#include <boost/filesystem/path.hpp>
+#include <stdio.h>
+#include <p2pvr.h>
+
+class FileSink : public P2PVR::RawDataClient {
+ public:
+ FileSink(const boost::filesystem::path & path);
+ ~FileSink();
+
+ bool NewData(const P2PVR::Data & data, const Ice::Current &);
+
+ private:
+ FILE * const file;
+};
+
+#endif
+
diff --git a/p2pvr/lib/maintenance/network.cpp b/p2pvr/lib/maintenance/network.cpp
index 1cd224a..f49e3f2 100644
--- a/p2pvr/lib/maintenance/network.cpp
+++ b/p2pvr/lib/maintenance/network.cpp
@@ -12,7 +12,7 @@ class SiNetworkInformationMerger : public SiNetworkInformationParser {
public:
SiNetworkInformationMerger(CommonObjects * co) : commonObjects(co) { }
- void HandleTable(DVBSI::NetworkPtr n)
+ bool HandleTable(DVBSI::NetworkPtr n)
{
Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name);
BOOST_FOREACH(const auto & ts, n->TransportStreams) {
@@ -55,6 +55,7 @@ class SiNetworkInformationMerger : public SiNetworkInformationParser {
}
mergeServices.loadComplete(commonObjects);
mergeServices.execute(NULL);
+ return false;
}
private:
CommonObjects * commonObjects;
diff --git a/p2pvr/lib/maintenance/programAssociations.cpp b/p2pvr/lib/maintenance/programAssociations.cpp
index 4d32a86..47e48b3 100644
--- a/p2pvr/lib/maintenance/programAssociations.cpp
+++ b/p2pvr/lib/maintenance/programAssociations.cpp
@@ -9,7 +9,7 @@
class SiProgramAssociationHandler : public SiProgramAssociationParser {
public:
- void HandleTable(ProgramAssociationMapPtr pam)
+ bool HandleTable(ProgramAssociationMapPtr pam)
{
Logger()->messagebf(LOG_DEBUG, "Program association table");
BOOST_FOREACH(const auto & pa, *pam) {
@@ -18,6 +18,7 @@ class SiProgramAssociationHandler : public SiProgramAssociationParser {
BOOST_FOREACH(const auto & pa, *pam) {
map[pa.first] = pa.second;
}
+ return false;
}
ProgramAssociationMap map;
diff --git a/p2pvr/lib/maintenance/programMap.cpp b/p2pvr/lib/maintenance/programMap.cpp
index ecbd126..bd4c2ee 100644
--- a/p2pvr/lib/maintenance/programMap.cpp
+++ b/p2pvr/lib/maintenance/programMap.cpp
@@ -17,7 +17,7 @@ class SiProgramMapHandler : public SiProgramMapParser {
SiProgramMapHandler(const RowProcessorCallback & cb) :
callBack(cb) {}
- void HandleTable(DVBSI::ProgramMapPtr pmp)
+ bool HandleTable(DVBSI::ProgramMapPtr pmp)
{
Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId);
BOOST_FOREACH(const auto & s, pmp->Streams) {
@@ -27,6 +27,7 @@ class SiProgramMapHandler : public SiProgramMapParser {
BindColumns<DVBSI::StreamPtr>(rowState, s);
rowState.process(callBack);
}
+ return false;
}
private:
diff --git a/p2pvr/lib/maintenance/services.cpp b/p2pvr/lib/maintenance/services.cpp
index bd4d9a2..08b8c9d 100644
--- a/p2pvr/lib/maintenance/services.cpp
+++ b/p2pvr/lib/maintenance/services.cpp
@@ -12,7 +12,7 @@ class SiServicesMerger : public SiServicesParser {
public:
SiServicesMerger(CommonObjects * co) : commonObjects(co) { }
- void HandleTable(DVBSI::TransportStreamPtr ts)
+ bool HandleTable(DVBSI::TransportStreamPtr ts)
{
Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId);
BOOST_FOREACH(const auto & s, ts->Services) {
@@ -30,6 +30,7 @@ class SiServicesMerger : public SiServicesParser {
mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services));
mergeServices.loadComplete(commonObjects);
mergeServices.execute(NULL);
+ return false;
}
private:
diff --git a/p2pvr/lib/serviceStreamer.cpp b/p2pvr/lib/serviceStreamer.cpp
new file mode 100644
index 0000000..ac034ab
--- /dev/null
+++ b/p2pvr/lib/serviceStreamer.cpp
@@ -0,0 +1,79 @@
+#include "serviceStreamer.h"
+#include <boost/bind.hpp>
+#include "fileSink.h"
+#include "bindSiParserHandler.h"
+
+ServiceStreamer::ServiceStreamer(int sid, const Ice::CommunicatorPtr & ic, const Ice::ObjectAdapterPtr & a) :
+ adapter(a),
+ devs(P2PVR::DevicesPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("GlobalDevices")))),
+ si(P2PVR::SIPrx::checkedCast(adapter->createProxy(ic->stringToIdentity("SI")))),
+ target(adapter, new FileSink("/tmp/out.ts")),
+ patParser(adapter, new BindSiParserHandler<ProgramAssociationMapPtr, SiProgramAssociationParser>(boost::bind(&ServiceStreamer::HandlePAT, this, _1))),
+ pmtParser(adapter, new BindSiParserHandler<DVBSI::ProgramMapPtr, SiProgramMapParser>(boost::bind(&ServiceStreamer::HandlePMT, this, _1))),
+ serviceId(sid),
+ patHandle(0), pmtStream(0), pmtHandle(0), serviceHandle(0)
+{
+}
+
+ServiceStreamer::~ServiceStreamer()
+{
+}
+
+bool
+ServiceStreamer::HandlePAT(ProgramAssociationMapPtr pam)
+{
+ const auto p = pam->find(serviceId);
+ if (p != pam->end() && p->second != pmtStream) {
+ pmtStream = p->second;
+ Logger()->messagef(LOG_DEBUG, "%s: Got ProgramAssociationMap, pmtStream now = %d", __PRETTY_FUNCTION__, pmtStream);
+ stopHandle(pmtHandle);
+ pmtHandle = tuner->StartSendingSection(pmtStream, pmtParser);
+ }
+ return true;
+}
+
+bool
+ServiceStreamer::HandlePMT(DVBSI::ProgramMapPtr pmp)
+{
+ Streams strms;
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ if (s->Type >= 2 && s->Type <= 5) {
+ strms.insert(s->Id);
+ }
+ }
+ if (strms != streams) {
+ streams = strms;
+ Logger()->messagebf(LOG_DEBUG, "%s: Got ProgramMap, switching to %d streams", __PRETTY_FUNCTION__, streams.size());
+ stopHandle(serviceHandle);
+ serviceHandle = tuner->StartSendingTS(P2PVR::PacketIds(streams.begin(), streams.end()), target);
+ }
+ return true;
+}
+
+void
+ServiceStreamer::Start()
+{
+ const auto transport = si->GetDeliveryForService(serviceId);
+ tuner = devs->GetTunerSpecific(transport, time(NULL) + 300);
+ patHandle = tuner->StartSendingSection(0, patParser);
+}
+
+void
+ServiceStreamer::Stop()
+{
+ stopHandle(serviceHandle);
+ stopHandle(patHandle);
+ stopHandle(pmtHandle);
+ devs->ReleaseTuner(tuner);
+ tuner = NULL;
+}
+
+void
+ServiceStreamer::stopHandle(int & handle)
+{
+ if (handle) {
+ tuner->StopSending(handle);
+ handle = 0;
+ }
+}
+
diff --git a/p2pvr/lib/serviceStreamer.h b/p2pvr/lib/serviceStreamer.h
new file mode 100644
index 0000000..3ea0c5c
--- /dev/null
+++ b/p2pvr/lib/serviceStreamer.h
@@ -0,0 +1,42 @@
+#ifndef SERVICESTREAMER_H
+#define SERVICESTREAMER_H
+
+#include <Ice/Communicator.h>
+#include <Ice/ObjectAdapter.h>
+#include "siParsers/programAssociation.h"
+#include "siParsers/programMap.h"
+#include "temporaryIceAdapterObject.h"
+#include <p2pvr.h>
+#include <set>
+
+class ServiceStreamer {
+ public:
+ ServiceStreamer(int sid, const Ice::CommunicatorPtr & ic, const Ice::ObjectAdapterPtr & a);
+ ~ServiceStreamer();
+
+ bool HandlePAT(ProgramAssociationMapPtr pam);
+ bool HandlePMT(DVBSI::ProgramMapPtr pmp);
+ void Start();
+ void Stop();
+ private:
+ void stopHandle(int & handle);
+
+ const Ice::ObjectAdapterPtr & adapter;
+ P2PVR::DevicesPrx devs;
+ P2PVR::SIPrx si;
+ P2PVR::TunerPrx tuner;
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> target;
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> patParser;
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> pmtParser;
+
+ int serviceId;
+ int patHandle;
+ int pmtStream;
+ int pmtHandle;
+ typedef std::set<int> Streams;
+ Streams streams;
+ int serviceHandle;
+};
+
+#endif
+
diff --git a/p2pvr/lib/si.cpp b/p2pvr/lib/si.cpp
index 032220e..98b2316 100644
--- a/p2pvr/lib/si.cpp
+++ b/p2pvr/lib/si.cpp
@@ -3,15 +3,29 @@
#include "sqlContainerCreator.h"
#include <linux/dvb/frontend.h>
#include <rdbmsDataSource.h>
-#include <selectcommand.h>
#include <logger.h>
+#include <sqlVariableBinder.h>
-typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+SI::SelectPtr
+SI::Select(const std::string & sql) const
+{
+ auto db = dataSource<RdbmsDataSource>("postgres");
+ return SelectPtr(db->getReadonly().newSelectCommand(sql));
+}
+SI::SelectPtr
+SI::Select(const std::string & sql, const std::list<VariableType> & vs) const
+{
+ SelectPtr sel(Select(sql));
+ unsigned int offset = 0;
+ BOOST_FOREACH(const auto & v, vs) {
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(sel.get(), offset++), v);
+ }
+ return sel;
+}
P2PVR::Deliveries
SI::GetAllDeliveries(short type, const Ice::Current &)
{
- auto db = dataSource<RdbmsDataSource>("postgres");
Logger()->messagebf(LOG_DEBUG, "%s(type %d)", __PRETTY_FUNCTION__, type);
P2PVR::Deliveries rtn;
SelectPtr sel;
@@ -19,22 +33,19 @@ SI::GetAllDeliveries(short type, const Ice::Current &)
case FE_OFDM:
{
SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cc(rtn);
- sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbt"));
- cc.populate(sel);
+ cc.populate(Select("SELECT * FROM delivery_dvbt"));
break;
}
case FE_QAM:
{
SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> cc(rtn);
- sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbc"));
- cc.populate(sel);
+ cc.populate(Select("SELECT * FROM delivery_dvbc"));
break;
}
case FE_QPSK:
{
SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> cc(rtn);
- sel = SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM delivery_dvbs"));
- cc.populate(sel);
+ cc.populate(Select("SELECT * FROM delivery_dvbs"));
break;
}
}
@@ -42,12 +53,47 @@ SI::GetAllDeliveries(short type, const Ice::Current &)
return rtn;
}
+DVBSI::DeliveryPtr
+SI::GetDeliveryForTransport(int id, const Ice::Current&)
+{
+ P2PVR::Deliveries rtn;
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn);
+ cct.populate(Select("SELECT * FROM delivery_dvbt WHERE transportStreamId = ?", {id}));
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn);
+ ccc.populate(Select("SELECT * FROM delivery_dvbc WHERE transportStreamId = ?", {id}));
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn);
+ ccs.populate(Select("SELECT * FROM delivery_dvbs WHERE transportStreamId = ?", {id}));
+ return rtn.front();
+}
+
+DVBSI::DeliveryPtr
+SI::GetDeliveryForService(int id, const Ice::Current&)
+{
+ P2PVR::Deliveries rtn;
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn);
+ cct.populate(Select("SELECT d.* FROM services s, delivery_dvbt d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id}));
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn);
+ ccc.populate(Select("SELECT d.* FROM services s, delivery_dvbc d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id}));
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn);
+ ccs.populate(Select("SELECT d.* FROM services s, delivery_dvbs d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", {id}));
+ return rtn.front();
+}
+
DVBSI::ServiceList
SI::GetServices(const Ice::Current&)
{
- auto db = dataSource<RdbmsDataSource>("postgres");
DVBSI::ServiceList rtn;
SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn);
- cc.populate(SelectPtr(db->getReadonly().newSelectCommand("SELECT * FROM services ORDER BY serviceId")));
+ cc.populate(Select("SELECT * FROM services ORDER BY serviceId"));
return rtn;
}
+
+DVBSI::ServicePtr
+SI::GetService(int id, const Ice::Current&)
+{
+ DVBSI::ServiceList rtn;
+ SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn);
+ cc.populate(Select("SELECT * FROM services WHERE serviceId = ?", {id}));
+ return rtn.front();
+}
+
diff --git a/p2pvr/lib/si.h b/p2pvr/lib/si.h
index 5530125..508c0fe 100644
--- a/p2pvr/lib/si.h
+++ b/p2pvr/lib/si.h
@@ -3,11 +3,20 @@
#include <p2pvr.h>
#include <commonObjects.h>
+#include <selectcommand.h>
class SI : public P2PVR::SI, public virtual CommonObjects {
public:
- P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current&);
- DVBSI::ServiceList GetServices(const Ice::Current&);
+ typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+
+ P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current &);
+ DVBSI::DeliveryPtr GetDeliveryForService(int id, const Ice::Current &);
+ DVBSI::DeliveryPtr GetDeliveryForTransport(int id, const Ice::Current &);
+ DVBSI::ServiceList GetServices(const Ice::Current &);
+ DVBSI::ServicePtr GetService(int id, const Ice::Current &);
+ protected:
+ SelectPtr Select(const std::string &) const;
+ SelectPtr Select(const std::string &, const std::list<VariableType> &) const;
};
#endif
diff --git a/p2pvr/lib/siParsers/event.cpp b/p2pvr/lib/siParsers/event.cpp
index 6e5446d..dbb9f90 100644
--- a/p2pvr/lib/siParsers/event.cpp
+++ b/p2pvr/lib/siParsers/event.cpp
@@ -205,9 +205,10 @@ SiEpgParser::CheckTableId(u_char tableId) const
return ((tableId >= 0x50 && tableId <= 0x5f) || (tableId >= 0x60 && tableId <= 0x6f));
}
-void
+bool
SiEpgParser::HandleTable(DVBSI::EitInformationPtr)
{
+ return false;
}
Common::DateTime &
diff --git a/p2pvr/lib/siParsers/event.h b/p2pvr/lib/siParsers/event.h
index 0528d98..943f5aa 100644
--- a/p2pvr/lib/siParsers/event.h
+++ b/p2pvr/lib/siParsers/event.h
@@ -20,7 +20,7 @@ class SiEpgParser : public SiTableParser<EventInformation, DVBSI::EitInformation
uint8_t FirstTableId(const EventInformation * ei) { return (ei->header.tableid >= 0x60 ? 0x60 : 0x50); }
uint8_t LastTableId(const EventInformation * ei) { return ei->LastTableId; }
void ParseSiTable(const EventInformation * eit, DVBSI::EitInformationPtr);
- void HandleTable(DVBSI::EitInformationPtr);
+ bool HandleTable(DVBSI::EitInformationPtr);
virtual void HandleTable(DVBSI::EventPtr) = 0;
private:
diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h
index effbe6e..4152b5e 100644
--- a/p2pvr/lib/siParsers/table.h
+++ b/p2pvr/lib/siParsers/table.h
@@ -121,14 +121,23 @@ class SiTableParser : public SiTableParserBase {
}
}
if (complete) {
- HandleTable(obj);
- obj = TargetType();
+ if (HandleTable(obj)) {
+ targetTableSections.clear();
+ }
+ else {
+ obj = TargetType();
+ }
incomplete -= 1;
}
}
}
data += HILO(siTable->header.section_length) + 4;
}
+ return IsFinished();
+ }
+
+ virtual bool IsFinished() const
+ {
return ((incomplete == 0) && (startTime < (time(NULL) - 10)));
}
@@ -186,7 +195,7 @@ class SiTableParser : public SiTableParserBase {
virtual bool CheckTableId(u_char tableId) const = 0;
virtual void ParseSiTable(const TableType *, TargetType) = 0;
- virtual void HandleTable(TargetType table) = 0;
+ virtual bool HandleTable(TargetType table) = 0;
private:
mutable Contents contents;