summaryrefslogtreecommitdiff
path: root/p2pvr/daemon
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-03-13 19:42:07 +0000
committerrandomdan <randomdan@localhost>2014-03-13 19:42:07 +0000
commitab1eee942e75874739ce5f0b4ba289aac5cc3faf (patch)
tree6e43828794fe0c0c5c9921ec1911695b67357c50 /p2pvr/daemon
parentExpose more of the interface (diff)
downloadp2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.bz2
p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.xz
p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.zip
Restructure into more sensibly arranged libs
Diffstat (limited to 'p2pvr/daemon')
-rw-r--r--p2pvr/daemon/Jamfile.jam17
-rw-r--r--p2pvr/daemon/dbClient.cpp56
-rw-r--r--p2pvr/daemon/dbClient.h103
-rw-r--r--p2pvr/daemon/globalDevices.cpp86
-rw-r--r--p2pvr/daemon/globalDevices.h25
-rw-r--r--p2pvr/daemon/maintenance.cpp97
-rw-r--r--p2pvr/daemon/maintenance.h39
-rw-r--r--p2pvr/daemon/maintenance/events.cpp92
-rw-r--r--p2pvr/daemon/maintenance/network.cpp107
-rw-r--r--p2pvr/daemon/maintenance/programAssociations.cpp80
-rw-r--r--p2pvr/daemon/maintenance/programMap.cpp133
-rw-r--r--p2pvr/daemon/maintenance/services.cpp68
-rw-r--r--p2pvr/daemon/pch.hpp24
-rw-r--r--p2pvr/daemon/recorder.cpp100
-rw-r--r--p2pvr/daemon/recorder.h51
-rw-r--r--p2pvr/daemon/recordings.cpp82
-rw-r--r--p2pvr/daemon/recordings.h16
-rw-r--r--p2pvr/daemon/schedulers/bitDumbScheduler.cpp55
-rw-r--r--p2pvr/daemon/schedules.cpp484
-rw-r--r--p2pvr/daemon/schedules.h85
-rw-r--r--p2pvr/daemon/si.cpp127
-rw-r--r--p2pvr/daemon/si.h23
-rw-r--r--p2pvr/daemon/sql/Recordings_delete.sql2
-rw-r--r--p2pvr/daemon/sql/Recordings_getAll.sql3
-rw-r--r--p2pvr/daemon/sql/Recordings_getStorage.sql4
-rw-r--r--p2pvr/daemon/sql/Recordings_insert.sql2
-rw-r--r--p2pvr/daemon/sql/Recordings_insertNewId.sql2
-rw-r--r--p2pvr/daemon/sql/SI_eventById.sql28
-rw-r--r--p2pvr/daemon/sql/SI_eventsInRange.sql28
-rw-r--r--p2pvr/daemon/sql/SI_eventsOnNow.sql27
-rw-r--r--p2pvr/daemon/sql/SI_serviceNextUsed.sql12
-rw-r--r--p2pvr/daemon/sql/SI_servicesSelectAll.sql3
-rw-r--r--p2pvr/daemon/sql/SI_servicesSelectById.sql3
-rw-r--r--p2pvr/daemon/sql/Schedules_GetCandidates.sql25
-rw-r--r--p2pvr/daemon/sql/Schedules_delete.sql1
-rw-r--r--p2pvr/daemon/sql/Schedules_insert.sql2
-rw-r--r--p2pvr/daemon/sql/Schedules_insertNewId.sql1
-rw-r--r--p2pvr/daemon/sql/Schedules_pendingRecord.sql13
-rw-r--r--p2pvr/daemon/sql/Schedules_scheduledToRecord.sql8
-rw-r--r--p2pvr/daemon/sql/Schedules_selectAll.sql3
-rw-r--r--p2pvr/daemon/sql/Schedules_selectById.sql5
-rw-r--r--p2pvr/daemon/sql/Schedules_update.sql10
-rw-r--r--p2pvr/daemon/sqlContainerCreator.h28
-rw-r--r--p2pvr/daemon/storage.cpp275
-rw-r--r--p2pvr/daemon/storage.h38
45 files changed, 2470 insertions, 3 deletions
diff --git a/p2pvr/daemon/Jamfile.jam b/p2pvr/daemon/Jamfile.jam
index 8a6ee8c..338f65e 100644
--- a/p2pvr/daemon/Jamfile.jam
+++ b/p2pvr/daemon/Jamfile.jam
@@ -1,9 +1,20 @@
-lib Ice ;
-lib IceUtil ;
+cpp-pch pch : pch.hpp :
+ <library>../ice//p2pvrice
+ <library>../lib//p2pvrlib
+ <library>../dvb//p2pvrdvb
+ <library>..//p2sql
+ <library>../devices//p2pvrdevices
+ <library>../daemonbase//p2pvrdaemonbase
+;
lib p2pvrdaemon :
- [ glob *.cpp ]
+ pch
+ [ glob-tree *.cpp *.sql ]
:
<library>../ice//p2pvrice
<library>../lib//p2pvrlib
+ <library>../dvb//p2pvrdvb
+ <library>..//p2sql
+ <library>../devices//p2pvrdevices
+ <library>../daemonbase//p2pvrdaemonbase
;
diff --git a/p2pvr/daemon/dbClient.cpp b/p2pvr/daemon/dbClient.cpp
new file mode 100644
index 0000000..8267584
--- /dev/null
+++ b/p2pvr/daemon/dbClient.cpp
@@ -0,0 +1,56 @@
+#include <pch.hpp>
+#include "dbClient.h"
+#include <sqlMergeTask.h>
+
+void
+DatabaseClient::SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key)
+{
+ merge->cols.insert(new SqlMergeTask::TargetColumn(name, key));
+ if (key) {
+ merge->keys.insert(name);
+ }
+}
+
+void
+DatabaseClient::onAllDatasources(const DataSourceCall & call) const
+{
+ BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) {
+ call(ds.second);
+ }
+}
+
+DatabaseClient::TxHelper::TxHelper(const DatabaseClient * dbc) :
+ client(dbc),
+ so(NULL,
+ boost::bind(&DatabaseClient::onAllDatasources, dbc, DataSourceCall(boost::bind(&DataSource::commit, _1))),
+ boost::bind(&DatabaseClient::onAllDatasources, dbc, DataSourceCall(boost::bind(&DataSource::rollback, _1))))
+{
+}
+
+void
+DatabaseClient::TxHelper::Commit() const
+{
+ client->onAllDatasources(boost::bind(&DataSource::commit, _1));
+}
+
+DatabaseClient::NoRowsFoundException::NoRowsFoundException() :
+ std::runtime_error("No rows found")
+{
+}
+
+VariableType
+operator/(const DatabaseClient::SelectPtr & cmd, unsigned int col)
+{
+ HandleAsVariableType vt;
+ (*cmd)[col].apply(vt);
+ return vt.variable;
+}
+
+VariableType
+operator/(const DatabaseClient::SelectPtr & cmd, const std::string & col)
+{
+ HandleAsVariableType vt;
+ (*cmd)[col].apply(vt);
+ return vt.variable;
+}
+
diff --git a/p2pvr/daemon/dbClient.h b/p2pvr/daemon/dbClient.h
new file mode 100644
index 0000000..84cf5e7
--- /dev/null
+++ b/p2pvr/daemon/dbClient.h
@@ -0,0 +1,103 @@
+#ifndef DBCLIENT_H
+#define DBCLIENT_H
+
+#include <commonObjects.h>
+#include <variableType.h>
+#include <list>
+#include <selectcommand.h>
+#include <modifycommand.h>
+#include <scopeObject.h>
+#include <rdbmsDataSource.h>
+#include <sqlVariableBinder.h>
+#include <sqlHandleAsVariableType.h>
+#include "p2Helpers.h"
+
+class SqlMergeTask;
+
+class DatabaseClient : public virtual CommonObjects {
+ public:
+ typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+ typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr;
+
+ static void SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key);
+
+ class TxHelper {
+ public:
+ TxHelper(const DatabaseClient *);
+ void Commit() const;
+
+ private:
+ const DatabaseClient * client;
+ ScopeObject so;
+ };
+
+ template <typename... Args>
+ std::pair<RdbmsDataSource::ConnectionRef, ModifyPtr> Modify(const std::string & sql, const Args & ... args) const
+ {
+ auto db = dataSource<RdbmsDataSource>("postgres")->getWritable();
+ auto cmd = ModifyPtr(db->newModifyCommand(sql));
+ Bind(cmd.get(), 0, args...);
+ return {db, cmd};
+ }
+
+ template <typename... Args>
+ std::pair<RdbmsDataSource::ConnectionRef, SelectPtr> Select(const std::string & sql, const Args & ... args) const
+ {
+ auto db = dataSource<RdbmsDataSource>("postgres")->getReadonly();
+ auto cmd = SelectPtr(db->newSelectCommand(sql));
+ Bind(cmd.get(), 0, args...);
+ return {db, cmd};
+ }
+
+ class NoRowsFoundException : public std::runtime_error {
+ public:
+ NoRowsFoundException();
+ };
+
+ template <typename Rtn, typename... Args>
+ Rtn SelectScalar(const std::string & sql, const Args & ... args) const
+ {
+ auto db = dataSource<RdbmsDataSource>("postgres");
+ auto cmd = SelectPtr(db->getReadonly()->newSelectCommand(sql));
+ Bind(cmd.get(), 0, args...);
+ while (cmd->fetch()) {
+ HandleAsVariableType h;
+ (*cmd)[0].apply(h);
+ Rtn r;
+ h.variable >> r;
+ return r;
+ }
+ throw NoRowsFoundException();
+ }
+
+ private:
+ static void Bind(DB::Command *, unsigned int) { }
+
+ template <typename Arg>
+ static void Bind(DB::Command * cmd, unsigned int offset, const Arg & arg)
+ {
+ VariableType v;
+ v << arg;
+ boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, offset), v);
+ }
+
+ template <typename Arg, typename... Args>
+ static void Bind(DB::Command * cmd, unsigned int offset, const Arg & arg, const Args & ... args)
+ {
+ Bind(cmd, offset, arg);
+ Bind(cmd, offset + 1, args...);
+ }
+
+ friend class TxHelper;
+ typedef boost::function<void(DataSourcePtr)> DataSourceCall;
+ void onAllDatasources(const DataSourceCall &) const;
+};
+
+VariableType
+operator/(const DatabaseClient::SelectPtr & cmd, unsigned int col);
+
+VariableType
+operator/(const DatabaseClient::SelectPtr & cmd, const std::string & col);
+
+#endif
+
diff --git a/p2pvr/daemon/globalDevices.cpp b/p2pvr/daemon/globalDevices.cpp
new file mode 100644
index 0000000..4368cea
--- /dev/null
+++ b/p2pvr/daemon/globalDevices.cpp
@@ -0,0 +1,86 @@
+#include <pch.hpp>
+#include "globalDevices.h"
+#include <Ice/Ice.h>
+
+std::vector<std::string> GlobalDevices::Devices;
+
+DECLARE_OPTIONS(GlobalDevices, "P2PVR Devices")
+("p2pvr.globaldevices.carddaemon",
+ Options::functions(
+ [](const VariableType & df) { Devices.push_back(df); },
+ []{ Devices.clear(); }),
+ "ICE address of remote device pools (<adapter>:<endpoint>)")
+END_OPTIONS(GlobalDevices);
+
+P2PVR::TunerPrx
+GlobalDevices::GetTunerSpecific(const DVBSI::DeliveryPtr & delivery, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ BOOST_FOREACH(const auto & pool, Devices) {
+ try {
+ auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool));
+ return poolprx->GetTunerSpecific(delivery);
+ }
+ catch (...) {
+ }
+ }
+ throw P2PVR::NoSuitableDeviceAvailable();
+}
+
+P2PVR::TunerPrx
+GlobalDevices::GetTunerAny(short type, const DVBSI::DeliveryPtr & delivery, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ BOOST_FOREACH(const auto & pool, Devices) {
+ try {
+ auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool));
+ return poolprx->GetTunerAny(type, delivery);
+ }
+ catch (...) {
+ }
+ }
+ throw P2PVR::NoSuitableDeviceAvailable();
+}
+
+P2PVR::PrivateTunerPrx
+GlobalDevices::GetPrivateTuner(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ BOOST_FOREACH(const auto & pool, Devices) {
+ try {
+ auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool));
+ return poolprx->GetPrivateTuner(type);
+ }
+ catch (...) {
+ }
+ }
+ throw P2PVR::NoSuitableDeviceAvailable();
+}
+
+void
+GlobalDevices::ReleaseTuner(const P2PVR::TunerPrx & tuner, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ BOOST_FOREACH(const auto & pool, Devices) {
+ auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool));
+ poolprx->ReleaseTuner(tuner);
+ }
+}
+
+int
+GlobalDevices::TunerCount(const Ice::Current & ice)
+{
+ int total = 0;
+ auto ic = ice.adapter->getCommunicator();
+ BOOST_FOREACH(const auto & pool, Devices) {
+ try {
+ auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool));
+ total += poolprx->TunerCount();
+ }
+ catch (...) {
+ // Not available, don't count 'em
+ }
+ }
+ return total;
+}
+
diff --git a/p2pvr/daemon/globalDevices.h b/p2pvr/daemon/globalDevices.h
new file mode 100644
index 0000000..da27c95
--- /dev/null
+++ b/p2pvr/daemon/globalDevices.h
@@ -0,0 +1,25 @@
+#ifndef GLOBALDEVICES_H
+#define GLOBALDEVICES_H
+
+// Global devices implements a device collection (P2PVR::Devices) for any devices known
+// throughout the system through other Devices interfaces
+
+#include <dvb.h>
+#include <options.h>
+
+class GlobalDevices : public P2PVR::Devices {
+ public:
+ P2PVR::TunerPrx GetTunerSpecific(const DVBSI::DeliveryPtr &, const Ice::Current &);
+ P2PVR::TunerPrx GetTunerAny(short type, const DVBSI::DeliveryPtr &, const Ice::Current &);
+ P2PVR::PrivateTunerPrx GetPrivateTuner(short type, const Ice::Current &);
+ void ReleaseTuner(const P2PVR::TunerPrx &, const Ice::Current &);
+ int TunerCount(const Ice::Current &);
+
+ INITOPTIONS;
+ private:
+ static std::vector<std::string> Devices;
+};
+
+#endif
+
+
diff --git a/p2pvr/daemon/maintenance.cpp b/p2pvr/daemon/maintenance.cpp
new file mode 100644
index 0000000..3475544
--- /dev/null
+++ b/p2pvr/daemon/maintenance.cpp
@@ -0,0 +1,97 @@
+#include <pch.hpp>
+#include <logger.h>
+#include <thread>
+#include "maintenance.h"
+#include <Ice/Ice.h>
+#include "bindTimerTask.h"
+#include <linux/dvb/frontend.h>
+#include <cxxabi.h>
+
+time_t Maintenance::periodUpdateNetwork;
+time_t Maintenance::periodUpdateServices;
+time_t Maintenance::periodUpdateEvents;
+
+DECLARE_OPTIONS(Maintenance, "P2PVR Maintenance options")
+("p2pvr.maintenance.periodUpdateNetwork", Options::value(&periodUpdateNetwork, 86400 * 7),
+ "Period between automated updates of DVB network (1 week)")
+("p2pvr.maintenance.periodUpdateServices", Options::value(&periodUpdateServices, 86400 * 7),
+ "Period between automated updates of DVB services (1 week)")
+("p2pvr.maintenance.periodUpdateEvents", Options::value(&periodUpdateEvents, 3600 * 12),
+ "Period between automated updates of DVB events (12 hours)")
+END_OPTIONS(Maintenance);
+
+Maintenance::Maintenance(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) :
+ adapter(a),
+ timer(t),
+ clientCheck(new BindTimerTask(boost::bind(&Maintenance::ScheduledUpdate, this))),
+ lastUpdateNetwork(0),
+ lastUpdateServices(0),
+ lastUpdateEvents(0),
+ updateRunning(false)
+{
+ timer->scheduleRepeated(clientCheck, IceUtil::Time::seconds(5 * 60));
+#ifdef NDEBUG
+ ScheduledUpdate();
+#endif
+}
+
+void
+Maintenance::UpdateAll(const Ice::Current & ice)
+{
+ UpdateAll(FE_OFDM, ice);
+ UpdateAll(FE_QPSK, ice);
+ UpdateAll(FE_ATSC, ice);
+ UpdateAll(FE_QAM, ice);
+}
+
+void
+Maintenance::UpdateAll(short type, const Ice::Current & ice)
+{
+ UpdateNetwork(type, ice);
+ UpdateServices(type, ice);
+ UpdateProgramAssociations(type, ice);
+ UpdateProgramMaps(type, ice);
+ UpdateEvents(type, ice);
+}
+
+void
+Maintenance::ScheduledUpdate()
+{
+ Logger()->messagebf(LOG_DEBUG, "%s: triggered", __PRETTY_FUNCTION__);
+ if (!updateRunning) {
+ std::thread update([this] {
+ try {
+ ScopeObject notRunning([this]{ updateRunning = false; });
+ updateRunning = true;
+ auto si = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Maintenance")));
+ time_t now = time(NULL);
+ if (lastUpdateNetwork < now - periodUpdateNetwork) {
+ Logger()->messagebf(LOG_INFO, "%s: updating network", __PRETTY_FUNCTION__);
+ si->UpdateNetwork(FE_OFDM);
+ time(&lastUpdateNetwork);
+ }
+ if (lastUpdateServices < now - periodUpdateServices) {
+ Logger()->messagebf(LOG_INFO, "%s: updating services", __PRETTY_FUNCTION__);
+ si->UpdateServices(FE_OFDM);
+ time(&lastUpdateServices);
+ }
+ if (lastUpdateEvents < now - periodUpdateEvents) {
+ Logger()->messagebf(LOG_INFO, "%s: updating events", __PRETTY_FUNCTION__);
+ si->UpdateEvents(FE_OFDM);
+ time(&lastUpdateEvents);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: completed", __PRETTY_FUNCTION__);
+ }
+ catch (const std::exception & ex) {
+ char * buf = __cxxabiv1::__cxa_demangle(typeid(ex).name(), NULL, NULL, NULL);
+ Logger()->messagebf(LOG_ERR, "%s: failed %s: %s", __PRETTY_FUNCTION__, buf, ex.what());
+ free(buf);
+ }
+ catch (...) {
+ Logger()->messagebf(LOG_ERR, "%s: failed (unknown exception)", __PRETTY_FUNCTION__);
+ }
+ });
+ update.detach();
+ }
+}
+
diff --git a/p2pvr/daemon/maintenance.h b/p2pvr/daemon/maintenance.h
new file mode 100644
index 0000000..563455a
--- /dev/null
+++ b/p2pvr/daemon/maintenance.h
@@ -0,0 +1,39 @@
+#ifndef P2PVR_MAINTENANCE_H
+#define P2PVR_MAINTENANCE_H
+
+#include <p2pvr.h>
+#include "dbClient.h"
+
+class Maintenance : public P2PVR::Maintenance, public DatabaseClient {
+ public:
+ Maintenance(Ice::ObjectAdapterPtr, IceUtil::TimerPtr);
+
+ void UpdateAll(const Ice::Current &);
+ void UpdateAll(short type, const Ice::Current &);
+ void UpdateNetwork(short type, const Ice::Current &);
+ void UpdateServices(short type, const Ice::Current &);
+ void UpdateProgramAssociations(short type, const Ice::Current &);
+ void UpdateProgramMaps(short type, const Ice::Current &);
+ void UpdateEvents(short type, const Ice::Current &);
+
+ INITOPTIONS;
+
+ private:
+ void ScheduledUpdate();
+
+ Ice::ObjectAdapterPtr adapter;
+ IceUtil::TimerPtr timer;
+ IceUtil::TimerTaskPtr clientCheck;
+
+ time_t lastUpdateNetwork;
+ time_t lastUpdateServices;
+ time_t lastUpdateEvents;
+ bool updateRunning;
+
+ static time_t periodUpdateNetwork;
+ static time_t periodUpdateServices;
+ static time_t periodUpdateEvents;
+};
+
+#endif
+
diff --git a/p2pvr/daemon/maintenance/events.cpp b/p2pvr/daemon/maintenance/events.cpp
new file mode 100644
index 0000000..b0c8b74
--- /dev/null
+++ b/p2pvr/daemon/maintenance/events.cpp
@@ -0,0 +1,92 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/event.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+#include <commonHelpers.h>
+
+class SiEventsHandler : public SiEpgParser {
+ public:
+ SiEventsHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ void HandleTable(DVBSI::EventPtr e)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s",
+ e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime);
+ BindColumns<DVBSI::EventPtr>(rowState, e);
+ rowState.process(callBack);
+ }
+
+ private:
+ ObjectRowState<DVBSI::EventPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+class SiEventsMerger : public IHaveSubTasks {
+ public:
+ SiEventsMerger(short t, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec)));
+
+ auto delivery = si->GetDeliveryForSi();
+ if (!delivery) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, delivery);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__);
+ tuner->SendEventInformation(parser);
+ devs->ReleaseTuner(tuner);
+ }
+
+ private:
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateEvents(short type, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ SqlMergeTask mergeEvents("postgres", "events");
+ CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2));
+ mergeEvents.sources.insert(new SiEventsMerger(type, ice));
+ mergeEvents.loadComplete(this);
+ mergeEvents.execute(NULL);
+ tx.Commit();
+ Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__);
+
+ auto ic = ice.adapter->getCommunicator();
+ auto sch = P2PVR::SchedulesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("Schedules")));
+ sch->DoReschedule();
+}
+
diff --git a/p2pvr/daemon/maintenance/network.cpp b/p2pvr/daemon/maintenance/network.cpp
new file mode 100644
index 0000000..d9ce0ec
--- /dev/null
+++ b/p2pvr/daemon/maintenance/network.cpp
@@ -0,0 +1,107 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/network.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiNetworkInformationMerger : public SiNetworkInformationParser {
+ public:
+ SiNetworkInformationMerger(DatabaseClient * co) : commonObjects(co) { }
+
+ bool HandleTable(DVBSI::NetworkPtr n)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name);
+ BOOST_FOREACH(const auto & ts, n->TransportStreams) {
+ Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType);
+ }
+ if (ts->Terrestrial) {
+ Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency);
+ }
+ }
+
+ DatabaseClient::TxHelper tx(commonObjects);
+ SqlMergeTask mergeNetwork("postgres", "networks");
+ CreateColumns<DVBSI::NetworkPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeNetwork, _1, _2));
+ std::vector<DVBSI::NetworkPtr> networks = { n };
+ mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks));
+ mergeNetwork.loadComplete(commonObjects);
+ mergeNetwork.execute(NULL);
+
+ SqlMergeTask mergeTransports("postgres", "transportstreams");
+ CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeTransports, _1, _2));
+ mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams));
+ mergeTransports.loadComplete(commonObjects);
+ mergeTransports.execute(NULL);
+
+ SqlMergeTask mergeDvbt("postgres", "delivery_dvbt");
+ CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeDvbt, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ if (s->Terrestrial) {
+ mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial));
+ }
+ }
+ mergeDvbt.loadComplete(commonObjects);
+ mergeDvbt.execute(NULL);
+
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::NetworkService>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ BOOST_FOREACH(const auto & s, n->TransportStreams) {
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services));
+ }
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ return false;
+ }
+ private:
+ DatabaseClient * commonObjects;
+};
+
+void
+Maintenance::UpdateNetwork(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+ auto siparser = new SiNetworkInformationMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ if (!devs) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+ auto transport = si->GetDeliveryForSi();
+ if (transport) {
+ P2PVR::TunerPrx tuner;
+ try {
+ tuner = devs->GetTunerAny(type, transport);
+ tuner->SendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+ return;
+ }
+ catch (const P2PVR::NoSuitableDeviceAvailable &) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__);
+ throw;
+ }
+ catch (const std::exception & ex) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what());
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ catch (...) {
+ Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ throw;
+ }
+ }
+ // If we can't do that, do a complete scan
+ auto tuner = devs->GetPrivateTuner(type);
+ tuner->ScanAndSendNetworkInformation(parser);
+ devs->ReleaseTuner(tuner);
+}
+
diff --git a/p2pvr/daemon/maintenance/programAssociations.cpp b/p2pvr/daemon/maintenance/programAssociations.cpp
new file mode 100644
index 0000000..ad6438c
--- /dev/null
+++ b/p2pvr/daemon/maintenance/programAssociations.cpp
@@ -0,0 +1,80 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/programAssociation.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <mapIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiProgramAssociationHandler : public SiProgramAssociationParser {
+ public:
+ bool HandleTable(ProgramAssociationMapPtr pam)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program association table");
+ BOOST_FOREACH(const auto & pa, *pam) {
+ Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second);
+ }
+ BOOST_FOREACH(const auto & pa, *pam) {
+ map[pa.first] = pa.second;
+ }
+ return false;
+ }
+
+ ProgramAssociationMap map;
+};
+
+static
+void
+CreatePATColumns(const ColumnCreator & cc)
+{
+ cc("serviceId", true);
+ cc("programId", false);
+}
+
+void
+Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiProgramAssociationHandler();
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ BOOST_FOREACH(const auto & transport, deliveries) {
+ try {
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerSpecific(transport);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramAssociationTable(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+ }
+ catch (...) {
+ // Tuning can fail
+ }
+ }
+
+ TxHelper tx(this);
+ SqlMergeTask mergeServices("postgres", "services");
+ CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ Columns cols;
+ mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map));
+ mergeServices.loadComplete(this);
+ mergeServices.execute(NULL);
+}
+
diff --git a/p2pvr/daemon/maintenance/programMap.cpp b/p2pvr/daemon/maintenance/programMap.cpp
new file mode 100644
index 0000000..94d7752
--- /dev/null
+++ b/p2pvr/daemon/maintenance/programMap.cpp
@@ -0,0 +1,133 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/programMap.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+#include <rdbmsDataSource.h>
+#include <column.h>
+#include <selectcommand.h>
+#include <sqlHandleAsVariableType.h>
+
+class SiProgramMapHandler : public SiProgramMapParser {
+ public:
+ SiProgramMapHandler(const RowProcessorCallback & cb) :
+ callBack(cb) {}
+
+ bool HandleTable(DVBSI::ProgramMapPtr pmp)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId);
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id);
+ }
+ BOOST_FOREACH(const auto & s, pmp->Streams) {
+ BindColumns<DVBSI::StreamPtr>(rowState, s);
+ rowState.process(callBack);
+ }
+ return false;
+ }
+
+ private:
+ ObjectRowState<DVBSI::StreamPtr> rowState;
+ const RowProcessorCallback callBack;
+};
+
+typedef boost::shared_ptr<DB::SelectCommand> SelectPtr;
+
+template<typename T>
+void
+operator<<(T & val, const DB::Column & col)
+{
+ HandleAsVariableType havt;
+ col.apply(havt);
+ val = havt.variable;
+}
+
+class SiProgramMapMerger : public IHaveSubTasks {
+ public:
+ SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) :
+ SourceObject(__PRETTY_FUNCTION__),
+ IHaveSubTasks(NULL),
+ commonObjects(co),
+ type(t),
+ ice(i) { }
+
+ void execute(ExecContext * ec) const
+ {
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter,
+ new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec)));
+
+ const auto deliveries = si->GetAllDeliveries(type);
+ if (deliveries.empty()) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ auto db = commonObjects->dataSource<RdbmsDataSource>("postgres")->getReadonly();
+ SelectPtr sel = SelectPtr(db->newSelectCommand("select d.frequency, s.programid \
+ from delivery_dvbt d, services s \
+ where d.transportstreamid = s.transportstreamid \
+ and s.programid is not null \
+ order by s.transportstreamid, s.serviceid"));
+ int64_t curFreq = 0;
+ P2PVR::TunerPrx tuner;
+ while (sel->fetch()) {
+ int64_t freq, pid;
+ freq << (*sel)[0];
+ pid << (*sel)[1];
+
+ if (freq != curFreq) {
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ const auto transport = *std::find_if(deliveries.begin(), deliveries.end(),
+ [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; });
+ tuner = devs->GetTunerSpecific(transport);
+ curFreq = freq;
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__);
+ tuner->SendProgramMap(pid, parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__);
+ }
+ if (tuner) {
+ devs->ReleaseTuner(tuner);
+ }
+ }
+
+ private:
+ CommonObjects * commonObjects;
+ const short type;
+ const Ice::Current & ice;
+
+ void executeChildren(ExecContext * ec) const
+ {
+ BOOST_FOREACH(const Tasks::value_type & sq, normal) {
+ sq->execute(ec);
+ }
+ }
+};
+
+void
+Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ SqlMergeTask mergeServiceStreams("postgres", "servicestreams");
+ CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2));
+ mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice));
+ mergeServiceStreams.loadComplete(this);
+ mergeServiceStreams.execute(NULL);
+}
+
diff --git a/p2pvr/daemon/maintenance/services.cpp b/p2pvr/daemon/maintenance/services.cpp
new file mode 100644
index 0000000..55409c2
--- /dev/null
+++ b/p2pvr/daemon/maintenance/services.cpp
@@ -0,0 +1,68 @@
+#include <pch.hpp>
+#include "../maintenance.h"
+#include <siParsers/service.h>
+#include <sqlMergeTask.h>
+#include <Ice/Communicator.h>
+#include <p2Helpers.h>
+#include <dvbsiHelpers.h>
+#include <containerIterator.h>
+#include <singleIterator.h>
+#include <temporaryIceAdapterObject.h>
+
+class SiServicesMerger : public SiServicesParser {
+ public:
+ SiServicesMerger(DatabaseClient * co) : commonObjects(co) { }
+
+ bool HandleTable(DVBSI::TransportStreamPtr ts)
+ {
+ Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId);
+ BOOST_FOREACH(const auto & s, ts->Services) {
+ Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d",
+ s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1),
+ (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"),
+ s->RunningStatus, s->FreeCaMode);
+ }
+
+ DatabaseClient::TxHelper tx(commonObjects);
+ SqlMergeTask mergeServices("postgres", "services");
+ CreateColumns<DVBSI::ServicePtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2));
+ // Don't change the list of services available from the network
+ mergeServices.doDelete = VariableType(false);
+ mergeServices.doInsert = VariableType(false);
+ mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services));
+ mergeServices.loadComplete(commonObjects);
+ mergeServices.execute(NULL);
+ return false;
+ }
+
+ private:
+ DatabaseClient * commonObjects;
+};
+
+void
+Maintenance::UpdateServices(short type, const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI")));
+
+ if (!devs || !si) {
+ throw std::runtime_error("bad proxy(s)");
+ }
+
+ auto siparser = new SiServicesMerger(this);
+ TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser);
+
+ auto delivery = si->GetDeliveryForSi();
+ if (!delivery) {
+ throw std::runtime_error("no delivery methods");
+ }
+
+ Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__);
+ auto tuner = devs->GetTunerAny(type, delivery);
+ Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__);
+ tuner->SendServiceDescriptions(parser);
+ Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__);
+ devs->ReleaseTuner(tuner);
+}
+
diff --git a/p2pvr/daemon/pch.hpp b/p2pvr/daemon/pch.hpp
new file mode 100644
index 0000000..bf16cef
--- /dev/null
+++ b/p2pvr/daemon/pch.hpp
@@ -0,0 +1,24 @@
+#ifdef BOOST_BUILD_PCH_ENABLED
+#ifndef P2PVRLIB_PCH
+#define P2PVRLIB_PCH
+
+#include <Ice/Ice.h>
+#include <glibmm.h>
+#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <variableType.h>
+#include <rdbmsDataSource.h>
+
+#endif
+#endif
+
diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp
new file mode 100644
index 0000000..bdd18c6
--- /dev/null
+++ b/p2pvr/daemon/recorder.cpp
@@ -0,0 +1,100 @@
+#include <pch.hpp>
+#include "recorder.h"
+#include "bindTimerTask.h"
+#include <boost/bind.hpp>
+#include <logger.h>
+#include <commonHelpers.h>
+#include <scopeObject.h>
+#include "serviceStreamer.h"
+#include "storage.h"
+#include "muxer.h"
+
+std::string Recorder::extension;
+std::string Recorder::muxerCommand;
+
+DECLARE_OPTIONS(Recorder, "P2PVR Recorder options")
+("p2pvr.recorder.extension", Options::value(&extension, "mpg"),
+ "File extension to save with (default: avi)")
+("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"),
+ "File extension to save with (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')")
+END_OPTIONS(Recorder);
+
+Recorder::Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) :
+ adapter(a),
+ timer(t)
+{
+ Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__);
+ RefreshSchedules(Ice::Current());
+}
+
+void
+Recorder::RefreshSchedules(const Ice::Current &)
+{
+ std::lock_guard<std::mutex> g(lock);
+ BOOST_FOREACH(auto & t, pendingRecordings) {
+ timer->cancel(t);
+ }
+ pendingRecordings.clear();
+ auto schedules = P2PVR::SchedulesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Schedules")));
+ auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI")));
+ BOOST_FOREACH(const auto & s, schedules->GetScheduledToRecord()) {
+ if (std::find_if(currentRecordings.begin(), currentRecordings.end(), [&s](const CurrentPtr & c) {
+ return s->ScheduleId == c->schedule->ScheduleId && s->ServiceId == c->event->ServiceId && s->EventId == c->event->EventId;;
+ }) != currentRecordings.end()) {
+ continue;
+ }
+ auto schedule = schedules->GetSchedule(s->ScheduleId);
+ auto service = si->GetService(s->ServiceId);
+ auto event = si->GetEvent(s->ServiceId, s->EventId);
+
+ auto startIn = std::max<time_t>((*event->StartTime - *schedule->Early - boost::posix_time::second_clock::universal_time()).total_seconds(), 0);
+ IceUtil::TimerTaskPtr startTimer = new BindTimerTask(boost::bind(&Recorder::StartRecording, this, schedule, service, event));
+ timer->schedule(startTimer, IceUtil::Time::seconds(startIn));
+ pendingRecordings.push_back(startTimer);
+ Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled for %s seconds", event->Title, startIn);
+ }
+}
+
+void
+Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event)
+{
+ std::lock_guard<std::mutex> g(lock);
+ auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));
+ auto recordings = P2PVR::RecordingsPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Recordings")));
+
+ auto id = storage->CreateForEventRecording(extension, schedule, service, event);
+ auto store = storage->OpenForWrite(id);
+ ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); });
+ auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand)));
+ ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); });
+ auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter));
+
+ ss->Start();
+ Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)",
+ event->Title, event->StartTime, event->StopTime,
+ service->Name ? *service->Name : "<no name>", service->ServiceId);
+
+ recordings->NewRecording(new P2PVR::Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->Title, event->Subtitle,
+ event->Description, event->StartTime, *(*event->StopTime - *event->StartTime)));
+ auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()}));
+ currentRecordings.insert(newCurrent);
+
+ auto stopIn = (*event->StopTime + *schedule->Late - boost::posix_time::second_clock::universal_time()).total_seconds();
+ newCurrent->stopTimer = new BindTimerTask(boost::bind(&Recorder::StopRecording, this, newCurrent));
+ timer->schedule(newCurrent->stopTimer, IceUtil::Time::seconds(stopIn));
+ Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled stop in %s seconds", event->Title, stopIn);
+}
+
+void
+Recorder::StopRecording(CurrentPtr c)
+{
+ std::lock_guard<std::mutex> g(lock);
+ Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title);
+ c->stream->Stop();
+ adapter->remove(c->muxer->ice_getIdentity());
+ currentRecordings.erase(c);
+ auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));
+ storage->Close(c->store);
+ Logger()->messagebf(LOG_DEBUG, "Stopped %s", c->event->Title);
+}
+
diff --git a/p2pvr/daemon/recorder.h b/p2pvr/daemon/recorder.h
new file mode 100644
index 0000000..78c604b
--- /dev/null
+++ b/p2pvr/daemon/recorder.h
@@ -0,0 +1,51 @@
+#ifndef RECORDER_H
+#define RECORDER_H
+
+#include <IceUtil/Timer.h>
+#include <Ice/ObjectAdapter.h>
+#include <options.h>
+#include <p2pvr.h>
+#include <mutex>
+#include "serviceStreamer.h"
+
+class Recorder : public P2PVR::Recorder {
+ public:
+ typedef std::vector<IceUtil::TimerTaskPtr> Pendings;
+
+ class Current {
+ public:
+ P2PVR::RawDataClientPrx muxer;
+ P2PVR::RawDataClientPrx store;
+ ServiceStreamerPtr stream;
+ P2PVR::SchedulePtr schedule;
+ DVBSI::ServicePtr service;
+ DVBSI::EventPtr event;
+ IceUtil::TimerTaskPtr stopTimer;
+ };
+ typedef boost::shared_ptr<Current> CurrentPtr;
+ typedef std::set<CurrentPtr> Currents;
+
+ Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr);
+
+ void RefreshSchedules(const Ice::Current &);
+
+ INITOPTIONS;
+
+ private:
+ void StartRecordings();
+ void StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event);
+ void StopRecording(CurrentPtr);
+
+ Ice::ObjectAdapterPtr adapter;
+ IceUtil::TimerPtr timer;
+
+ Currents currentRecordings;
+ Pendings pendingRecordings;
+ std::mutex lock;
+
+ static std::string extension;
+ static std::string muxerCommand;
+};
+
+#endif
+
diff --git a/p2pvr/daemon/recordings.cpp b/p2pvr/daemon/recordings.cpp
new file mode 100644
index 0000000..bad8032
--- /dev/null
+++ b/p2pvr/daemon/recordings.cpp
@@ -0,0 +1,82 @@
+#include <pch.hpp>
+#include "recordings.h"
+#include "resources.h"
+#include <Ice/Ice.h>
+#include <logger.h>
+#include "sqlContainerCreator.h"
+
+ResourceString(Recording_Insert, daemon_sql_Recordings_insert_sql);
+ResourceString(Recording_InsertNewId, daemon_sql_Recordings_insertNewId_sql);
+ResourceString(Recording_Delete, daemon_sql_Recordings_delete_sql);
+ResourceString(Recording_GetStorage, daemon_sql_Recordings_getStorage_sql);
+ResourceString(Recording_GetAll, daemon_sql_Recordings_getAll_sql);
+
+template<>
+void
+CreateColumns<P2PVR::RecordingPtr>(const ColumnCreator & cc)
+{
+ cc("recordingid", true);
+ cc("storageaddress", false);
+ cc("guid", false);
+ cc("scheduleid", false);
+ cc("title", false);
+ cc("subtitle", false);
+ cc("description", false);
+ cc("starttime", false);
+ cc("duration", false);
+}
+
+template<>
+void
+UnbindColumns(RowState & rs, const P2PVR::RecordingPtr & r)
+{
+ rs.fields[0] >> r->RecordingId;
+ rs.fields[1] >> r->StorageAddress;
+ rs.fields[2] >> r->Guid;
+ rs.fields[3] >> r->ScheduleId;
+ rs.fields[4] >> r->Title;
+ rs.fields[5] >> r->Subtitle;
+ rs.fields[6] >> r->Description;
+ rs.fields[7] >> r->StartTime;
+ rs.fields[8] >> r->Duration;
+}
+
+int
+Recordings::NewRecording(const P2PVR::RecordingPtr & r, const Ice::Current &)
+{
+ Logger()->messagebf(LOG_INFO, "%s: Creating new recording %s at %s", __PRETTY_FUNCTION__, r->Guid, r->StorageAddress);
+ TxHelper tx(this);
+ auto insert = Modify(Recording_Insert,
+ r->StorageAddress, r->Guid, r->ScheduleId, r->Title, r->Subtitle, r->Description, r->StartTime, r->Duration);
+ insert.second->execute();
+ r->RecordingId = SelectScalar<int>(Recording_InsertNewId);
+ Logger()->messagebf(LOG_INFO, "%s: Created recording Id: %d", __PRETTY_FUNCTION__, r->RecordingId);
+ return r->RecordingId;
+}
+
+void
+Recordings::DeleteRecording(int id, const Ice::Current & ice)
+{
+ Logger()->messagebf(LOG_INFO, "%s: Deleting recording Id: %d", __PRETTY_FUNCTION__, id);
+ auto ic = ice.adapter->getCommunicator();
+ TxHelper tx(this);
+ auto recordingStorages = Select(Recording_GetStorage, id);
+ while (recordingStorages.second->fetch()) {
+ std::string addr = recordingStorages.second / "storageaddress";
+ std::string guid = recordingStorages.second / "guid";
+ auto storage = P2PVR::StoragePrx::checkedCast(ic->stringToProxy(addr));
+ storage->Delete(guid);
+ Logger()->messagebf(LOG_DEBUG, "%s: Delete %s from StorageAddress %s", __PRETTY_FUNCTION__, guid, addr);
+ }
+ Modify(Recording_Delete, id).second->execute();
+}
+
+P2PVR::RecordingList
+Recordings::GetRecordings(const Ice::Current &)
+{
+ P2PVR::RecordingList rtn;
+ SqlContainerCreator<P2PVR::RecordingList, P2PVR::Recording> cc(rtn);
+ cc.populate(Select(Recording_GetAll).second);
+ return rtn;
+}
+
diff --git a/p2pvr/daemon/recordings.h b/p2pvr/daemon/recordings.h
new file mode 100644
index 0000000..0f18a36
--- /dev/null
+++ b/p2pvr/daemon/recordings.h
@@ -0,0 +1,16 @@
+#ifndef RECORDINGS_H
+#define RECORDINGS_H
+
+#include <p2pvr.h>
+#include <string>
+#include "dbClient.h"
+
+class Recordings : public DatabaseClient, public P2PVR::Recordings {
+ public:
+ int NewRecording(const P2PVR::RecordingPtr & rec, const Ice::Current &);
+ void DeleteRecording(int recordingId, const Ice::Current &);
+ P2PVR::RecordingList GetRecordings(const Ice::Current &);
+};
+
+#endif
+
diff --git a/p2pvr/daemon/schedulers/bitDumbScheduler.cpp b/p2pvr/daemon/schedulers/bitDumbScheduler.cpp
new file mode 100644
index 0000000..f3cf5fb
--- /dev/null
+++ b/p2pvr/daemon/schedulers/bitDumbScheduler.cpp
@@ -0,0 +1,55 @@
+#include <pch.hpp>
+#include "../schedules.h"
+
+class TheBitDumbScheduler : public EpisodeGroup {
+ public:
+ TheBitDumbScheduler(const Episodes & eps) :
+ episodes(eps)
+ {
+ }
+
+ protected:
+ void SelectShowings()
+ {
+ SelectShowings(episodes.begin(), 1);
+ }
+
+ private:
+ void SelectShowings(Episodes::const_iterator e, uint64_t complexityLevel)
+ {
+ if (e != episodes.end()) {
+ complexityLevel *= (*e)->showings.size();
+ if (complexityLevel > 2 << 16) {
+ auto current = showings.size();
+ for (EpisodesIter ne = e; ne != episodes.end(); ne++) {
+ BOOST_FOREACH(const auto & s, (*ne)->showings) {
+ showings.push_back(s);
+ if (SuggestWithFeedback(showings) & 0x1) {
+ break;
+ }
+ showings.pop_back();
+ }
+ }
+ showings.resize(current);
+ }
+ else {
+ EpisodesIter ne = e;
+ ne++;
+ BOOST_FOREACH(const auto & s, (*e)->showings) {
+ showings.push_back(s);
+ SelectShowings(ne, complexityLevel);
+ showings.pop_back();
+ }
+ }
+ }
+ else {
+ Suggest(showings);
+ }
+ }
+
+ mutable Showings showings; // working set
+ const Episodes episodes;
+};
+
+DECLARE_GENERIC_LOADER("BitDumb", EpisodeGroupLoader, TheBitDumbScheduler);
+
diff --git a/p2pvr/daemon/schedules.cpp b/p2pvr/daemon/schedules.cpp
new file mode 100644
index 0000000..90df59c
--- /dev/null
+++ b/p2pvr/daemon/schedules.cpp
@@ -0,0 +1,484 @@
+#include <pch.hpp>
+#include "schedules.h"
+#include "sqlContainerCreator.h"
+#include <rdbmsDataSource.h>
+#include <logger.h>
+#include <Ice/Ice.h>
+#include <sqlVariableBinder.h>
+#include <sqlMergeTask.h>
+#include "p2Helpers.h"
+#include "containerIterator.h"
+#include "resources.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <instanceStore.impl.h>
+
+ResourceString(Schedules_GetCandidates, daemon_sql_Schedules_GetCandidates_sql);
+ResourceString(Schedules_insert, daemon_sql_Schedules_insert_sql);
+ResourceString(Schedules_insertNewId, daemon_sql_Schedules_insertNewId_sql);
+ResourceString(Schedules_update, daemon_sql_Schedules_update_sql);
+ResourceString(Schedules_delete, daemon_sql_Schedules_delete_sql);
+ResourceString(Schedules_selectAll, daemon_sql_Schedules_selectAll_sql);
+ResourceString(Schedules_selectById, daemon_sql_Schedules_selectById_sql);
+ResourceString(Schedules_scheduledToRecord, daemon_sql_Schedules_scheduledToRecord_sql);
+
+std::string Schedules::SchedulerAlgorithm;
+
+DECLARE_OPTIONS(Schedules, "P2PVR Scheduler options")
+("p2pvr.scheduler.algorithm", Options::value(&SchedulerAlgorithm, "BitDumb"),
+ "Implementation of episode group scheduler problem solver")
+END_OPTIONS()
+
+class ScheduleCandidate {
+ public:
+ std::string What;
+ int ServiceId;
+ int EventId;
+ int TransportStreamId;
+ datetime StartTime;
+ datetime StopTime;
+ int Priority;
+ int ScheduleId;
+};
+typedef boost::shared_ptr<ScheduleCandidate> ScheduleCandidatePtr;
+typedef std::vector<ScheduleCandidatePtr> ScheduleCandidates;
+
+enum RecordStatuses {
+ Record_WillRecordThisShowing = 0,
+ Record_WillRecordOtherShowing = 1,
+ Record_CannotRecordAnyShowing = 2
+};
+
+class Record {
+ public:
+ Record() { };
+ Record(int s, int e, RecordStatuses rs, int sc) :
+ ServiceId(s),
+ EventId(e),
+ RecordStatus(rs),
+ ScheduleId(sc)
+ {
+ }
+
+ int ServiceId;
+ int EventId;
+ RecordStatuses RecordStatus;
+ int ScheduleId;
+};
+typedef boost::shared_ptr<Record> RecordPtr;
+typedef std::vector<RecordPtr> Records;
+
+template<>
+void
+CreateColumns<P2PVR::ScheduledToRecordPtr>(const ColumnCreator & cc)
+{
+ cc("serviceid", true);
+ cc("eventid", true);
+ cc("scheduleid", true);
+}
+
+template<>
+void
+UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s)
+{
+ rs.fields[0] >> s->ServiceId;
+ rs.fields[1] >> s->EventId;
+ rs.fields[2] >> s->ScheduleId;
+}
+
+template<>
+void
+CreateColumns<ScheduleCandidatePtr>(const ColumnCreator & cc)
+{
+ cc("what", true);
+ cc("serviceid", false);
+ cc("eventid", false);
+ cc("transportstreamid", false);
+ cc("starttime", false);
+ cc("stoptime", false);
+ cc("priority", false);
+ cc("scheduleid", false);
+}
+
+template<>
+void
+UnbindColumns(RowState & rs, ScheduleCandidatePtr const & s)
+{
+ rs.fields[0] >> s->What;
+ rs.fields[1] >> s->ServiceId;
+ rs.fields[2] >> s->EventId;
+ rs.fields[3] >> s->TransportStreamId;
+ rs.fields[4] >> s->StartTime;
+ rs.fields[5] >> s->StopTime;
+ rs.fields[6] >> s->Priority;
+ rs.fields[7] >> s->ScheduleId;
+}
+
+template<>
+void
+CreateColumns<P2PVR::SchedulePtr>(const ColumnCreator & cc)
+{
+ cc("scheduleid", true);
+ cc("serviceid", false);
+ cc("eventid", false);
+ cc("title", false);
+ cc("search", false);
+ cc("priority", false);
+ cc("early", false);
+ cc("late", false);
+ cc("repeats", false);
+}
+
+template<>
+void
+UnbindColumns(RowState & rs, P2PVR::SchedulePtr const & s)
+{
+ rs.fields[0] >> s->ScheduleId;
+ rs.fields[1] >> s->ServiceId;
+ rs.fields[2] >> s->EventId;
+ rs.fields[3] >> s->Title;
+ rs.fields[4] >> s->Search;
+ rs.fields[5] >> s->Priority;
+ rs.fields[6] >> s->Early;
+ rs.fields[7] >> s->Late;
+ rs.fields[8] >> s->Repeats;
+}
+
+template<>
+void
+CreateColumns<RecordPtr>(const ColumnCreator & cc)
+{
+ cc("serviceid", true);
+ cc("eventid", true);
+ cc("recordstatus", false);
+ cc("scheduleid", false);
+}
+
+template<>
+void
+BindColumns(RowState & rs, RecordPtr const & s)
+{
+ rs.fields[0] << s->ServiceId;
+ rs.fields[1] << s->EventId;
+ rs.fields[2] << (int)s->RecordStatus;
+ rs.fields[3] << s->ScheduleId;
+}
+
+Showing::Showing(unsigned int s, unsigned int e, unsigned int t, unsigned int sc, datetime start, datetime stop, int p, const Episode * ep) :
+ episode(ep),
+ serviceId(s),
+ eventId(e),
+ priority(p),
+ scheduleId(sc),
+ transportStreamId(t),
+ startTime(start),
+ stopTime(stop),
+ period(start, stop)
+{
+}
+
+Episode::Episode(const std::string & w) :
+ priority(0),
+ what(w)
+{
+}
+
+EpisodeGroup::EpisodeGroup() :
+ tuners(1),
+ sumTimeToStart(0),
+ score(0)
+{
+}
+
+bool
+EpisodeGroup::IsShowingListValid(const Showings & showings) const
+{
+ struct TransOffset {
+ unsigned int trans;
+ char offset;
+ };
+ typedef std::multimap<datetime, TransOffset> Periods;
+ typedef std::map<int, unsigned char> Usage;
+
+ Periods periods;
+ Usage usage;
+
+ BOOST_FOREACH(const auto & s, showings) {
+ if (s) {
+ periods.insert(Periods::value_type(s->startTime, {s->transportStreamId, 1}));
+ periods.insert(Periods::value_type(s->stopTime, {s->transportStreamId, -1}));
+ }
+ }
+ bool result = true;
+ BOOST_FOREACH(const auto & p, periods) {
+ auto & u = usage[p.second.trans];
+ u += p.second.offset;
+ if (std::count_if(usage.begin(), usage.end(), [](const Usage::value_type & uv) { return uv.second > 0;}) > tuners) {
+ result = false;
+ break;
+ }
+ }
+ periods.clear();
+ usage.clear();
+ return result;
+}
+
+template<typename T>
+inline
+bool NotNull(const T & p)
+{
+ return p != NULL;
+}
+
+class SumTimeToStart {
+ public:
+ SumTimeToStart(time_t & t) :
+ total(t),
+ now(boost::posix_time::second_clock::universal_time())
+ {
+ total = 0;
+ }
+ inline void operator()(const ShowingPtr & s) const
+ {
+ if (s) {
+ total += std::min<time_t>((now - s->startTime).seconds(), 0);
+ }
+ }
+ public:
+ time_t & total;
+ datetime now;
+};
+
+const Showings &
+EpisodeGroup::Solve()
+{
+ SelectShowings();
+ return selected;
+}
+
+void
+EpisodeGroup::Suggest(const Showings & showings)
+{
+ unsigned int c = 0;
+ std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; });
+ if (c >= score) {
+ time_t stt;
+ std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt));
+ if (stt < sumTimeToStart || (stt == sumTimeToStart && c > score)) {
+ if (IsShowingListValid(showings)) {
+ selected = showings;
+ score = c;
+ sumTimeToStart = stt;
+ }
+ }
+ }
+}
+
+EpisodeGroup::SuggestionResult
+EpisodeGroup::SuggestWithFeedback(const Showings & showings)
+{
+ if (IsShowingListValid(showings)) {
+ unsigned int c = 0;
+ std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; });
+ if (c >= score) {
+ time_t stt;
+ std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt));
+ if (stt < sumTimeToStart) {
+ selected = showings;
+ score = c;
+ sumTimeToStart = stt;
+ return SuggestionValidAndAccepted;
+ }
+ }
+ return SuggestionValid;
+ }
+ else {
+ return SuggestionInvalid;
+ }
+}
+
+void
+Schedules::GetEpisodeIntersects(Episodes & all, Episodes & grouped)
+{
+ for (Episodes::iterator aei = all.begin(); aei != all.end(); aei++) {
+ const auto & ae = *aei;
+ BOOST_FOREACH(const auto & ge, grouped) {
+ BOOST_FOREACH(const auto & gs, ge->showings) {
+ BOOST_FOREACH(const auto & as, ae->showings) {
+ if (gs->period.intersects(as->period)) {
+ Logger()->messagebf(LOG_DEBUG, " added %s", ae->what);
+ grouped.push_back(ae);
+ all.erase(aei);
+ GetEpisodeIntersects(all, grouped);
+ return;
+ }
+ }
+ }
+ }
+ }
+}
+
+void
+Schedules::DoReschedule(const Ice::Current & ice)
+{
+ auto ic = ice.adapter->getCommunicator();
+ auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices")));
+ unsigned int tunerCount = devs->TunerCount();
+
+ // Load list from database
+ ScheduleCandidates episodes;
+ SqlContainerCreator<ScheduleCandidates, ScheduleCandidate, ScheduleCandidatePtr> cct(episodes);
+ cct.populate(Select(Schedules_GetCandidates).second);
+
+ Episodes scheduleList;
+ Showings allShowings;
+ EpisodePtr cur;
+ int minPriority = 0;
+ BOOST_FOREACH(const auto & c, episodes) {
+ if (!cur || cur->what != c->What) {
+ cur = new Episode(c->What);
+ scheduleList.push_back(cur);
+ }
+ ShowingPtr s = new Showing(c->ServiceId, c->EventId, c->TransportStreamId, c->ScheduleId,
+ c->StartTime, c->StopTime, c->Priority, cur.get());
+ minPriority = std::min(minPriority, s->priority);
+ cur->showings.push_back(s);
+ allShowings.push_back(s);
+ }
+ Logger()->messagebf(LOG_DEBUG, "%d episodes created, %s showings", scheduleList.size(), allShowings.size());
+ BOOST_FOREACH(const auto & e, scheduleList) {
+ Logger()->messagebf(LOG_DEBUG, " %s", e->what);
+ BOOST_FOREACH(const auto & s, e->showings) {
+ s->priority += 1 - minPriority;
+ e->priority += s->priority;
+ }
+ e->priority /= e->showings.size();
+ }
+
+ Records records;
+ // Solve
+ while (!scheduleList.empty()) {
+ auto work = scheduleList.begin();
+ Logger()->messagebf(LOG_DEBUG, "start %s", (*work)->what);
+ Episodes group;
+ group.push_back(*work);
+ scheduleList.erase(work);
+ GetEpisodeIntersects(scheduleList, group);
+ std::sort(group.begin(), group.end(), [](const EpisodePtr & a, const EpisodePtr & b) {
+ if (a->priority > b->priority) return true;
+ if (a->priority < b->priority) return false;
+ return a->what < b->what;
+ });
+
+ Logger()->messagebf(LOG_DEBUG, "group created with %d episodes", group.size());
+ double total = 1;
+ // Measure and add the optional to not record
+ BOOST_FOREACH(const auto & e, group) {
+ Logger()->messagebf(LOG_DEBUG, " %d * %d:%s", e->showings.size(), e->priority, e->what);
+ e->showings.push_back(NULL);
+ total *= e->showings.size();
+ }
+ Logger()->messagebf(LOG_DEBUG, "group complexity of %d options", total);
+
+ EpisodeGroupPtr sched = EpisodeGroupPtr(EpisodeGroupLoader::createNew(SchedulerAlgorithm, group));
+ sched->tuners = tunerCount;
+ std::set<ShowingPtr> selected;
+ BOOST_FOREACH(const auto & s, sched->Solve()) {
+ if (s) selected.insert(s);
+ }
+
+ BOOST_FOREACH(const auto & c, group) {
+ Logger()->messagebf(LOG_DEBUG, "Episode %s, %d options", c->what, c->showings.size());
+ BOOST_FOREACH(const auto & i, c->showings) {
+ if (selected.find(i) != selected.end()) {
+ Logger()->messagebf(LOG_DEBUG, " %s - %s (%d) <-", i->startTime, i->stopTime, i->transportStreamId);
+ }
+ else if (i) {
+ Logger()->messagebf(LOG_DEBUG, " %s - %s (%d)", i->startTime, i->stopTime, i->transportStreamId);
+ }
+ }
+ }
+ Logger()->message(LOG_DEBUG, "----------");
+ BOOST_FOREACH(const auto & c, group) {
+ bool found = false;
+ BOOST_FOREACH(const auto & i, c->showings) {
+ if (i && selected.find(i) != selected.end()) {
+ found = true;
+ break;
+ }
+ }
+ BOOST_FOREACH(const auto & i, c->showings) {
+ if (i) {
+ records.push_back(RecordPtr(new Record(i->serviceId, i->eventId,
+ found ?
+ selected.find(i) != selected.end() ? Record_WillRecordThisShowing : Record_WillRecordOtherShowing :
+ Record_CannotRecordAnyShowing, i->scheduleId)));
+ }
+ }
+ }
+ }
+
+ TxHelper tx(this);
+ SqlMergeTask mergeRecords("postgres", "record");
+ CreateColumns<RecordPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeRecords, _1, _2));
+ mergeRecords.sources.insert(new ContainerIterator<Records>(&records));
+ mergeRecords.loadComplete(this);
+ mergeRecords.execute(NULL);
+ tx.Commit();
+
+ auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder")));
+ recorder->RefreshSchedules();
+}
+
+void
+Schedules::DeleteSchedule(int id, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ Modify(Schedules_delete, id).second->execute();
+ DoReschedule(ice);
+}
+
+P2PVR::ScheduleList
+Schedules::GetSchedules(const Ice::Current &)
+{
+ P2PVR::ScheduleList schedules;
+ SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules);
+ cct.populate(Select(Schedules_selectAll).second);
+ return schedules;
+}
+
+P2PVR::SchedulePtr
+Schedules::GetSchedule(int id, const Ice::Current &)
+{
+ P2PVR::ScheduleList schedules;
+ SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules);
+ cct.populate(Select(Schedules_selectById, id).second);
+ if (schedules.empty()) throw P2PVR::NotFound();
+ return schedules.front();
+}
+
+P2PVR::ScheduledToRecordList
+Schedules::GetScheduledToRecord(const Ice::Current &)
+{
+ P2PVR::ScheduledToRecordList scheduled;
+ SqlContainerCreator<P2PVR::ScheduledToRecordList, P2PVR::ScheduledToRecord> cct(scheduled);
+ cct.populate(Select(Schedules_scheduledToRecord).second);
+ return scheduled;
+}
+
+int
+Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice)
+{
+ TxHelper tx(this);
+ if (s->ScheduleId == 0) {
+ Modify(Schedules_insert, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats).second->execute();
+ s->ScheduleId = SelectScalar<int>(Schedules_insertNewId);
+ }
+ else {
+ Modify(Schedules_update, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats, s->ScheduleId).second->execute();
+ }
+ DoReschedule(ice);
+ return s->ScheduleId;
+}
+
+INSTANTIATESTORE(std::string, EpisodeGroupLoader);
+
diff --git a/p2pvr/daemon/schedules.h b/p2pvr/daemon/schedules.h
new file mode 100644
index 0000000..f7075dd
--- /dev/null
+++ b/p2pvr/daemon/schedules.h
@@ -0,0 +1,85 @@
+#ifndef SCHEDULER_H
+#define SCHEDULER_H
+
+#include <p2pvr.h>
+#include <options.h>
+#include "dbClient.h"
+#include <genLoader.h>
+
+typedef boost::posix_time::ptime datetime;
+class Episode;
+
+class Showing : public IntrusivePtrBase {
+ public:
+ Showing(unsigned int s, unsigned int e, unsigned int t, unsigned int sc, datetime start, datetime stop, int p, const Episode * ep);
+ // Record what?
+ const Episode * episode;
+ const unsigned int serviceId;
+ const unsigned int eventId;
+ int priority;
+ const unsigned int scheduleId;
+ // Requires
+ const unsigned int transportStreamId;
+ const datetime startTime;
+ const datetime stopTime;
+ const boost::posix_time::time_period period;
+};
+typedef boost::intrusive_ptr<Showing> ShowingPtr;
+typedef std::vector<ShowingPtr> Showings;
+typedef Showings::const_iterator ShowingsIter;
+
+class Episode : public IntrusivePtrBase {
+ public:
+ Episode(const std::string & w);
+ int priority;
+ const std::string what;
+ Showings showings;
+};
+typedef boost::intrusive_ptr<Episode> EpisodePtr;
+typedef std::vector<EpisodePtr> Episodes;
+typedef Episodes::const_iterator EpisodesIter;
+
+class EpisodeGroup {
+ public:
+ EpisodeGroup();
+
+ const Showings & Solve();
+ unsigned int tuners;
+
+ protected:
+ enum SuggestionResult {
+ SuggestionInvalid = 0, SuggestionValid = 1, SuggestionValidAndAccepted = 3
+ };
+ SuggestionResult SuggestWithFeedback(const Showings &);
+ void Suggest(const Showings &);
+ virtual void SelectShowings() = 0;
+
+ private:
+ bool IsShowingListValid(const Showings & showings) const;
+ // chosen set
+ time_t sumTimeToStart;
+ unsigned int score;
+ Showings selected;
+};
+
+class Schedules : public P2PVR::Schedules, public DatabaseClient {
+ public:
+ void DeleteSchedule(int id, const Ice::Current &);
+ P2PVR::SchedulePtr GetSchedule(int id, const Ice::Current &);
+ P2PVR::ScheduleList GetSchedules(const Ice::Current &);
+ P2PVR::ScheduledToRecordList GetScheduledToRecord(const Ice::Current &);
+ int UpdateSchedule(const P2PVR::SchedulePtr &, const Ice::Current &);
+ void DoReschedule(const Ice::Current &);
+
+ INITOPTIONS;
+ protected:
+ static void GetEpisodeIntersects(Episodes &, Episodes &);
+ private:
+ static std::string SchedulerAlgorithm;
+};
+
+typedef GenLoader<EpisodeGroup, std::string, const Episodes &> EpisodeGroupLoader;
+typedef boost::shared_ptr<EpisodeGroup> EpisodeGroupPtr;
+
+#endif
+
diff --git a/p2pvr/daemon/si.cpp b/p2pvr/daemon/si.cpp
new file mode 100644
index 0000000..557003c
--- /dev/null
+++ b/p2pvr/daemon/si.cpp
@@ -0,0 +1,127 @@
+#include <pch.hpp>
+#include "si.h"
+#include "resources.h"
+#include "dvbsiHelpers.h"
+#include "sqlContainerCreator.h"
+#include <linux/dvb/frontend.h>
+#include <logger.h>
+
+ResourceString(SI_serviceNextUsed, daemon_sql_SI_serviceNextUsed_sql);
+ResourceString(SI_servicesSelectAll, daemon_sql_SI_servicesSelectAll_sql);
+ResourceString(SI_servicesSelectById, daemon_sql_SI_servicesSelectById_sql);
+ResourceString(SI_eventById, daemon_sql_SI_eventById_sql);
+ResourceString(SI_eventsOnNow, daemon_sql_SI_eventsOnNow_sql);
+ResourceString(SI_eventsInRange, daemon_sql_SI_eventsInRange_sql);
+
+P2PVR::Deliveries
+SI::GetAllDeliveries(short type, const Ice::Current &)
+{
+ Logger()->messagebf(LOG_DEBUG, "%s(type %d)", __PRETTY_FUNCTION__, type);
+ P2PVR::Deliveries rtn;
+ SelectPtr sel;
+ switch (type) {
+ case FE_OFDM:
+ {
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cc(rtn);
+ cc.populate(Select("SELECT * FROM delivery_dvbt ORDER BY transportStreamId").second);
+ break;
+ }
+ case FE_QAM:
+ {
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> cc(rtn);
+ cc.populate(Select("SELECT * FROM delivery_dvbc ORDER BY transportStreamId").second);
+ break;
+ }
+ case FE_QPSK:
+ {
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> cc(rtn);
+ cc.populate(Select("SELECT * FROM delivery_dvbs ORDER BY transportStreamId").second);
+ break;
+ }
+ }
+ Logger()->messagebf(LOG_DEBUG, "%s: Found %d delivery methods", __PRETTY_FUNCTION__, rtn.size());
+ return rtn;
+}
+
+DVBSI::DeliveryPtr
+SI::GetDeliveryForTransport(int id, const Ice::Current&)
+{
+ P2PVR::Deliveries rtn;
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn);
+ cct.populate(Select("SELECT * FROM delivery_dvbt WHERE transportStreamId = ?", id).second);
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn);
+ ccc.populate(Select("SELECT * FROM delivery_dvbc WHERE transportStreamId = ?", id).second);
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn);
+ ccs.populate(Select("SELECT * FROM delivery_dvbs WHERE transportStreamId = ?", id).second);
+ return rtn.front();
+}
+
+DVBSI::DeliveryPtr
+SI::GetDeliveryForSi(const Ice::Current&)
+{
+ P2PVR::Deliveries rtn;
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn);
+ cct.populate(Select(SI_serviceNextUsed).second);
+ return rtn.front();
+}
+
+DVBSI::DeliveryPtr
+SI::GetDeliveryForService(int id, const Ice::Current&)
+{
+ P2PVR::Deliveries rtn;
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn);
+ cct.populate(Select("SELECT d.* FROM services s, delivery_dvbt d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second);
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn);
+ ccc.populate(Select("SELECT d.* FROM services s, delivery_dvbc d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second);
+ SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn);
+ ccs.populate(Select("SELECT d.* FROM services s, delivery_dvbs d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second);
+ return rtn.front();
+}
+
+DVBSI::ServiceList
+SI::GetServices(const Ice::Current&)
+{
+ DVBSI::ServiceList rtn;
+ SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn);
+ cc.populate(Select(SI_servicesSelectAll).second);
+ return rtn;
+}
+
+DVBSI::ServicePtr
+SI::GetService(int id, const Ice::Current&)
+{
+ DVBSI::ServiceList rtn;
+ SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn);
+ cc.populate(Select(SI_servicesSelectById, id).second);
+ if (rtn.empty()) throw P2PVR::NotFound();
+ return rtn.front();
+}
+
+DVBSI::EventPtr
+SI::GetEvent(int serviceId, int eventId, const Ice::Current &)
+{
+ DVBSI::Events rtn;
+ SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn);
+ cc.populate(Select(SI_eventById, serviceId, eventId).second);
+ if (rtn.empty()) throw P2PVR::NotFound();
+ return rtn.front();
+}
+
+DVBSI::Events
+SI::EventsOnNow(const Ice::Current &)
+{
+ DVBSI::Events rtn;
+ SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn);
+ cc.populate(Select(SI_eventsOnNow).second);
+ return rtn;
+}
+
+DVBSI::Events
+SI::EventsInRange(const Common::DateTime & from, const Common::DateTime & to, const Ice::Current &)
+{
+ DVBSI::Events rtn;
+ SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn);
+ cc.populate(Select(SI_eventsInRange, from, to).second);
+ return rtn;
+}
+
diff --git a/p2pvr/daemon/si.h b/p2pvr/daemon/si.h
new file mode 100644
index 0000000..6b720dc
--- /dev/null
+++ b/p2pvr/daemon/si.h
@@ -0,0 +1,23 @@
+#ifndef P2PVR_SI_H
+#define P2PVR_SI_H
+
+#include <p2pvr.h>
+#include "dbClient.h"
+
+class SI : public P2PVR::SI, public DatabaseClient {
+ public:
+ P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current &);
+ DVBSI::DeliveryPtr GetDeliveryForService(int id, const Ice::Current &);
+ DVBSI::DeliveryPtr GetDeliveryForTransport(int id, const Ice::Current &);
+ DVBSI::DeliveryPtr GetDeliveryForSi(const Ice::Current &);
+
+ DVBSI::ServiceList GetServices(const Ice::Current &);
+ DVBSI::ServicePtr GetService(int id, const Ice::Current &);
+
+ DVBSI::EventPtr GetEvent(int serviceId, int eventId, const Ice::Current &);
+ DVBSI::Events EventsOnNow(const Ice::Current &);
+ DVBSI::Events EventsInRange(const Common::DateTime &, const Common::DateTime &, const Ice::Current &);
+};
+
+#endif
+
diff --git a/p2pvr/daemon/sql/Recordings_delete.sql b/p2pvr/daemon/sql/Recordings_delete.sql
new file mode 100644
index 0000000..3395376
--- /dev/null
+++ b/p2pvr/daemon/sql/Recordings_delete.sql
@@ -0,0 +1,2 @@
+DELETE FROM recordings
+WHERE recordingId = ?
diff --git a/p2pvr/daemon/sql/Recordings_getAll.sql b/p2pvr/daemon/sql/Recordings_getAll.sql
new file mode 100644
index 0000000..c0a096c
--- /dev/null
+++ b/p2pvr/daemon/sql/Recordings_getAll.sql
@@ -0,0 +1,3 @@
+SELECT recordingId, storageAddress, guid, scheduleId, title, subtitle, description, startTime, duration
+FROM recordings r
+ORDER BY startTime, title, subtitle
diff --git a/p2pvr/daemon/sql/Recordings_getStorage.sql b/p2pvr/daemon/sql/Recordings_getStorage.sql
new file mode 100644
index 0000000..fdaf58d
--- /dev/null
+++ b/p2pvr/daemon/sql/Recordings_getStorage.sql
@@ -0,0 +1,4 @@
+SELECT storageAddress, guid
+FROM recordings
+WHERE recordingId = ?
+
diff --git a/p2pvr/daemon/sql/Recordings_insert.sql b/p2pvr/daemon/sql/Recordings_insert.sql
new file mode 100644
index 0000000..dfe2ccc
--- /dev/null
+++ b/p2pvr/daemon/sql/Recordings_insert.sql
@@ -0,0 +1,2 @@
+INSERT INTO recordings(storageAddress, guid, scheduleId, title, subtitle, description, startTime, duration)
+VALUES(?, ?, ?, ?, ?, ?, ?, ?)
diff --git a/p2pvr/daemon/sql/Recordings_insertNewId.sql b/p2pvr/daemon/sql/Recordings_insertNewId.sql
new file mode 100644
index 0000000..0583b49
--- /dev/null
+++ b/p2pvr/daemon/sql/Recordings_insertNewId.sql
@@ -0,0 +1,2 @@
+SELECT currval('recordings_recordingid_seq');
+
diff --git a/p2pvr/daemon/sql/SI_eventById.sql b/p2pvr/daemon/sql/SI_eventById.sql
new file mode 100644
index 0000000..f0e028d
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_eventById.sql
@@ -0,0 +1,28 @@
+select e.serviceid,
+ e.eventid,
+ e.title,
+ e.titlelang,
+ e.subtitle,
+ e.description,
+ e.descriptionlang,
+ e.videoaspect,
+ e.videoframerate,
+ e.videohd,
+ e.audiochannels,
+ e.audiolanguage,
+ e.subtitlelanguage,
+ e.category,
+ e.subcategory,
+ e.usercategory,
+ e.dvbrating,
+ e.starttime,
+ e.stoptime,
+ e.episode,
+ e.episodes,
+ e.year,
+ e.flags,
+ e.season
+from events e
+where serviceid = ?
+and eventid = ?
+
diff --git a/p2pvr/daemon/sql/SI_eventsInRange.sql b/p2pvr/daemon/sql/SI_eventsInRange.sql
new file mode 100644
index 0000000..0a2d6de
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_eventsInRange.sql
@@ -0,0 +1,28 @@
+select e.serviceid,
+ e.eventid,
+ e.title,
+ e.titlelang,
+ e.subtitle,
+ e.description,
+ e.descriptionlang,
+ e.videoaspect,
+ e.videoframerate,
+ e.videohd,
+ e.audiochannels,
+ e.audiolanguage,
+ e.subtitlelanguage,
+ e.category,
+ e.subcategory,
+ e.usercategory,
+ e.dvbrating,
+ e.starttime,
+ e.stoptime,
+ e.episode,
+ e.episodes,
+ e.year,
+ e.flags,
+ e.season
+from events e
+where tsrange(?, ?, '[)') && tsrange(e.starttime, e.stoptime)
+order by e.serviceid, e.starttime
+
diff --git a/p2pvr/daemon/sql/SI_eventsOnNow.sql b/p2pvr/daemon/sql/SI_eventsOnNow.sql
new file mode 100644
index 0000000..36760b4
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_eventsOnNow.sql
@@ -0,0 +1,27 @@
+select e.serviceid,
+ e.eventid,
+ e.title,
+ e.titlelang,
+ e.subtitle,
+ e.description,
+ e.descriptionlang,
+ e.videoaspect,
+ e.videoframerate,
+ e.videohd,
+ e.audiochannels,
+ e.audiolanguage,
+ e.subtitlelanguage,
+ e.category,
+ e.subcategory,
+ e.usercategory,
+ e.dvbrating,
+ e.starttime,
+ e.stoptime,
+ e.episode,
+ e.episodes,
+ e.year,
+ e.flags,
+ e.season
+from events e
+where now()::timestamp without time zone <@ tsrange(e.starttime, e.stoptime)
+order by e.serviceid
diff --git a/p2pvr/daemon/sql/SI_serviceNextUsed.sql b/p2pvr/daemon/sql/SI_serviceNextUsed.sql
new file mode 100644
index 0000000..8e906ad
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_serviceNextUsed.sql
@@ -0,0 +1,12 @@
+SELECT dd.*
+FROM delivery_dvbt dd, services s
+ LEFT OUTER JOIN record r
+ ON r.serviceid = s.serviceid
+ AND r.recordstatus = 0
+ LEFT OUTER JOIN events e
+ ON r.eventid = e.eventid
+ AND r.serviceid = e.serviceid
+ AND e.starttime > NOW()
+WHERE dd.transportstreamid = s.transportstreamid
+ORDER BY e.starttime, s.serviceid
+LIMIT 1
diff --git a/p2pvr/daemon/sql/SI_servicesSelectAll.sql b/p2pvr/daemon/sql/SI_servicesSelectAll.sql
new file mode 100644
index 0000000..513a076
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_servicesSelectAll.sql
@@ -0,0 +1,3 @@
+SELECT serviceId, transportStreamId, name, providerName, defaultAuthority, runningStatus, eitSchedule, eitPresentFollowing, freeCaMode
+FROM services
+ORDER BY serviceId
diff --git a/p2pvr/daemon/sql/SI_servicesSelectById.sql b/p2pvr/daemon/sql/SI_servicesSelectById.sql
new file mode 100644
index 0000000..f0bda72
--- /dev/null
+++ b/p2pvr/daemon/sql/SI_servicesSelectById.sql
@@ -0,0 +1,3 @@
+SELECT serviceId, transportStreamId, name, providerName, defaultAuthority, runningStatus, eitSchedule, eitPresentFollowing, freeCaMode
+FROM services
+WHERE serviceId = ?
diff --git a/p2pvr/daemon/sql/Schedules_GetCandidates.sql b/p2pvr/daemon/sql/Schedules_GetCandidates.sql
new file mode 100644
index 0000000..8e8b15e
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_GetCandidates.sql
@@ -0,0 +1,25 @@
+select what, serviceid, eventid, transportstreamid,
+ starttime - early starttime, stoptime + late stoptime,
+ priority, scheduleid
+from (
+ select (e.title, e.subtitle, e.description)::text what, e.serviceid, e.eventid, sv.transportstreamid,
+ e.starttime, e.stoptime - interval '1 second' stoptime,
+ s.early, s.late, s.scheduleid, s.priority,
+ rank() over(partition by e.serviceid, e.eventid, sv.serviceid order by s.priority desc, s.scheduleid) schedulerank
+ from services sv, events e, schedules s
+ where (s.serviceid is null or s.serviceid = e.serviceid)
+ and (s.title is null or lower(s.title) = lower(e.title))
+ and (s.eventid is null or s.eventid = e.eventid)
+ and (s.search is null or event_tsvector(e) @@ plainto_tsquery(s.search))
+ and sv.serviceid = e.serviceid
+ and e.stoptime > now()
+ and not exists (
+ select 1
+ from recorded r
+ where lower(e.title) = lower(r.title)
+ and coalesce(lower(e.subtitle), '') = coalesce(lower(r.subtitle), '')
+ and ts_rank(to_tsvector(e.description), plainto_tsquery(r.description)) +
+ ts_rank(to_tsvector(r.description), plainto_tsquery(e.description)) > 1)) e
+where e.schedulerank = 1
+order by e.priority desc, e.what, e.transportstreamid, e.starttime
+
diff --git a/p2pvr/daemon/sql/Schedules_delete.sql b/p2pvr/daemon/sql/Schedules_delete.sql
new file mode 100644
index 0000000..e14edc3
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_delete.sql
@@ -0,0 +1 @@
+DELETE FROM schedules WHERE scheduleId = ?
diff --git a/p2pvr/daemon/sql/Schedules_insert.sql b/p2pvr/daemon/sql/Schedules_insert.sql
new file mode 100644
index 0000000..100e78b
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_insert.sql
@@ -0,0 +1,2 @@
+INSERT INTO schedules(serviceId, eventId, title, search, priority, early, late, repeats)
+VALUES(?, ?, ?, ?, ?, ?, ?, ?)
diff --git a/p2pvr/daemon/sql/Schedules_insertNewId.sql b/p2pvr/daemon/sql/Schedules_insertNewId.sql
new file mode 100644
index 0000000..f66acd5
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_insertNewId.sql
@@ -0,0 +1 @@
+SELECT currval('schedules_scheduleid_seq');
diff --git a/p2pvr/daemon/sql/Schedules_pendingRecord.sql b/p2pvr/daemon/sql/Schedules_pendingRecord.sql
new file mode 100644
index 0000000..ba4d7d8
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_pendingRecord.sql
@@ -0,0 +1,13 @@
+select *
+from record r, events e, schedules s
+where r.serviceid = e.serviceid
+and r.eventid = e.eventid
+and /*r.scheduleid*/ 161 = s.scheduleid
+and r.recordstatus = 1
+and r.recordingstatus = 0
+and e.starttime - s.early - interval '3minutes' < now()
+order by e.starttime - s.early
+
+;
+select *
+from schedules \ No newline at end of file
diff --git a/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql b/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql
new file mode 100644
index 0000000..3874c37
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql
@@ -0,0 +1,8 @@
+SELECT r.serviceid, r.eventid, r.scheduleid
+FROM record r, events e, schedules s
+WHERE recordstatus = 0
+AND r.serviceid = e.serviceid
+AND r.eventid = e.eventid
+AND r.scheduleid = s.scheduleid
+AND e.stoptime + s.late > NOW()
+ORDER BY e.starttime, e.serviceid
diff --git a/p2pvr/daemon/sql/Schedules_selectAll.sql b/p2pvr/daemon/sql/Schedules_selectAll.sql
new file mode 100644
index 0000000..e9e500e
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_selectAll.sql
@@ -0,0 +1,3 @@
+SELECT scheduleid, serviceid, eventid, title, search, priority, early, late, repeats
+FROM schedules
+ORDER BY scheduleId
diff --git a/p2pvr/daemon/sql/Schedules_selectById.sql b/p2pvr/daemon/sql/Schedules_selectById.sql
new file mode 100644
index 0000000..4990418
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_selectById.sql
@@ -0,0 +1,5 @@
+SELECT scheduleid, serviceid, eventid, title, search, priority, early, late, repeats
+FROM schedules
+WHERE scheduleid = ?
+ORDER BY scheduleId
+
diff --git a/p2pvr/daemon/sql/Schedules_update.sql b/p2pvr/daemon/sql/Schedules_update.sql
new file mode 100644
index 0000000..56c9531
--- /dev/null
+++ b/p2pvr/daemon/sql/Schedules_update.sql
@@ -0,0 +1,10 @@
+UPDATE schedules SET
+ serviceId = ?,
+ eventId = ?,
+ title = ?,
+ search = ?,
+ priority = ?,
+ early = ?,
+ late = ?,
+ repeats = ?
+WHERE scheduleId = ?
diff --git a/p2pvr/daemon/sqlContainerCreator.h b/p2pvr/daemon/sqlContainerCreator.h
new file mode 100644
index 0000000..5c8d8f8
--- /dev/null
+++ b/p2pvr/daemon/sqlContainerCreator.h
@@ -0,0 +1,28 @@
+#ifndef SQLCONTAINERCREATOR_H
+#define SQLCONTAINERCREATOR_H
+
+#include "containerCreator.h"
+#include <boost/shared_ptr.hpp>
+#include <selectcommand.h>
+#include <sqlHandleAsVariableType.h>
+
+template <typename T, typename V, typename P = IceInternal::Handle<V>>
+class SqlContainerCreator : public ContainerCreator<T, V, P> {
+ public:
+ SqlContainerCreator(T & c) : ContainerCreator<T, V, P>(c) { }
+
+ void populate(boost::shared_ptr<DB::SelectCommand> sel)
+ {
+ sel->execute();
+ ContainerCreator<T, V, P>::populate(boost::bind(&DB::SelectCommand::fetch, sel), [sel](unsigned int c) {
+ HandleAsVariableType h;
+ const DB::Column & col = (*sel)[c];
+ col.apply(h);
+ return h.variable;
+ }, sel->columnCount());
+ }
+};
+
+
+#endif
+
diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp
new file mode 100644
index 0000000..9d9345d
--- /dev/null
+++ b/p2pvr/daemon/storage.cpp
@@ -0,0 +1,275 @@
+#include <pch.hpp>
+#include "storage.h"
+#include "fileSink.h"
+#include <commonHelpers.h>
+#include <fcntl.h>
+#include <logger.h>
+#include <boost/filesystem/operations.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace fs = boost::filesystem;
+
+fs::path Storage::root;
+fs::path Storage::byAll;
+fs::path Storage::byTitle;
+fs::path Storage::byDate;
+fs::path Storage::byService;
+fs::path Storage::bySchedule;
+
+DECLARE_OPTIONS(Storage, "P2PVR Storage options")
+("p2pvr.storage.root", Options::value(&root, "recordings"),
+ "Root folder in which to store recordings")
+("p2pvr.storage.all", Options::value(&byAll, "all"),
+ "Sub folder in which to store all recordings")
+("p2pvr.storage.bytitle", Options::value(&byTitle, "title"),
+ "Sub folder in which to store by title")
+("p2pvr.storage.bydate", Options::value(&byDate, "date"),
+ "Sub folder in which to store by date")
+("p2pvr.storage.byservice", Options::value(&byService, "service"),
+ "Sub folder in which to store by service")
+("p2pvr.storage.byschedule", Options::value(&bySchedule, "schedule"),
+ "Sub folder in which to store by schedule")
+END_OPTIONS(Storage);
+
+inline static
+fs::path createPath(const fs::path & start)
+{
+ return start;
+}
+
+template <typename Arg>
+inline static
+fs::path
+operator/(const fs::path & lhs, const IceUtil::Optional<Arg> & rhs)
+{
+ if (rhs) {
+ return lhs / *rhs;
+ }
+ return lhs;
+}
+
+template <typename Arg, typename ... Args>
+inline static
+fs::path createPath(const fs::path & start, const Arg & arg, const Args & ... args)
+{
+ auto path = start / arg;
+ return createPath(path, args...);
+}
+
+template <typename Arg, typename ... Args>
+inline static
+std::string firstNotNull(const Arg & arg, const Args & ...)
+{
+ return boost::lexical_cast<std::string>(arg);
+}
+
+template <typename Arg, typename ... Args>
+inline static
+std::string firstNotNull(const IceUtil::Optional<Arg> & arg, const Args & ... args)
+{
+ return arg ? firstNotNull(*arg, args...) : firstNotNull(args...);
+}
+
+template <typename ... Args>
+inline static
+std::string firstNotNull(const std::string & arg, const Args & ...)
+{
+ return arg;
+}
+
+inline static
+std::string
+formatIf(const boost::format & f)
+{
+ return f.str();
+}
+
+template <typename Arg, typename ... Args>
+inline static
+IceUtil::Optional<std::string>
+formatIf(boost::format & f, const IceUtil::Optional<Arg> & arg, const Args & ... args)
+{
+ if (arg) {
+ f % *arg;
+ return formatIf(f, args...);
+ }
+ else {
+ return IceUtil::Optional<std::string>();
+ }
+}
+
+template <typename Arg, typename ... Args>
+inline static
+IceUtil::Optional<std::string>
+formatIf(boost::format & f, const Arg & arg, const Args & ... args)
+{
+ f % arg;
+ return formatIf(f, args...);
+}
+
+template <typename... Args>
+inline static
+IceUtil::Optional<std::string>
+formatIf(const std::string & msgfmt, const Args & ... args)
+{
+ boost::format fmt(msgfmt);
+ return formatIf(fmt, args...);
+}
+
+std::string
+Storage::CreateForEventRecording(const std::string & ext, const P2PVR::SchedulePtr & schedule, const DVBSI::ServicePtr & service, const DVBSI::EventPtr & event, const Ice::Current &)
+{
+ fs::create_directories(root / byAll);
+ auto id = boost::lexical_cast<std::string>(boost::uuids::random_generator()());
+ fs::path path = root / byAll / id;
+ path.replace_extension(ext);
+ auto fd = open(path.string().c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
+ if (fd < 0) {
+ Logger()->messagebf(LOG_ERR, "%s: Failed to open file for writing at %s (%d:%s)", __PRETTY_FUNCTION__,
+ path, errno, strerror(errno));
+ throw P2PVR::StorageException(path.string(), strerror(errno));
+ }
+ createSymlinks(ext, id, schedule, service, event);
+ Logger()->messagebf(LOG_INFO, "%s: Created new recording %s", __PRETTY_FUNCTION__, path);
+
+ close(fd);
+ return id;
+}
+
+void
+Storage::createSymlinks(const std::string & ext, const std::string & target, const P2PVR::SchedulePtr & schedule, const DVBSI::ServicePtr & service, const DVBSI::EventPtr & event)
+{
+ // By title, with optional season and episode information
+ createSymlink(ext, target, createPath(root, byTitle,
+ event->Title,
+ formatIf("Season %02d", event->Season),
+ firstNotNull(
+ formatIf("Part %02d of %02d - %s", event->Episode, event->Episodes, event->Subtitle),
+ formatIf("Episode %02d - %s", event->Episode, event->Subtitle),
+ formatIf("Part %02d of %02d - %s", event->Episode, event->Episodes, event->Description),
+ formatIf("Part %02d of %02d", event->Episode, event->Episodes),
+ formatIf("Episode %02d - %s", event->Episode, event->Description),
+ event->Subtitle,
+ event->Description,
+ formatIf("Episode %02d", event->Episode),
+ event->StartTime)));
+ // By date
+ createSymlink(ext, target, createPath(root, byDate,
+ formatIf("%04d-%02d-%02d", event->StartTime.Year, event->StartTime.Month, event->StartTime.Day),
+ firstNotNull(
+ formatIf("%s - %s", event->Title, event->Subtitle),
+ formatIf("%s - %s", event->Title, event->Description),
+ event->Title)));
+ // By service
+ createSymlink(ext, target, createPath(root, byService,
+ service->Name,
+ firstNotNull(
+ formatIf("%s - %s", event->Title, event->Subtitle),
+ formatIf("%s - %s", event->Title, event->Description),
+ event->Title)));
+ // By schedule title
+ createSymlink(ext, target, createPath(root, bySchedule,
+ formatIf("Title: %s", schedule->Title),
+ formatIf("%04d-%02d-%02d", event->StartTime.Year, event->StartTime.Month, event->StartTime.Day),
+ firstNotNull(
+ formatIf("%s - %s", event->Title, event->Subtitle),
+ formatIf("%s - %s", event->Title, event->Description),
+ event->Title)));
+ // By schedule search
+ createSymlink(ext, target, createPath(root, bySchedule,
+ formatIf("Search: %s", schedule->Search),
+ firstNotNull(
+ formatIf("%s - %s", event->Title, event->Subtitle),
+ formatIf("%s - %s", event->Title, event->Description),
+ event->Title)));
+}
+void
+Storage::createSymlink(const std::string & ext, const std::string & target, const fs::path & link)
+{
+ fs::path path = link;
+ path.replace_extension(ext);
+ Logger()->messagebf(LOG_DEBUG, "%s: link(%s) -> target(%s)", __PRETTY_FUNCTION__, path, target);
+ if (fs::exists(path)) {
+ Logger()->messagebf(LOG_WARNING, "%s: symlink already exists %s", __PRETTY_FUNCTION__, path);
+ return;
+ }
+ fs::create_directories(path.parent_path());
+ fs::path relativeTarget;
+ for (fs::path tmp = path.parent_path(); tmp != root; tmp = tmp.parent_path()) {
+ relativeTarget /= "..";
+ }
+ relativeTarget /= byAll;
+ relativeTarget /= target;
+ relativeTarget.replace_extension(ext);
+ fs::create_symlink(relativeTarget, path);
+}
+
+std::string
+Storage::FindExtension(const std::string & id)
+{
+ fs::directory_iterator end;
+ for (fs::directory_iterator itr(root / byAll); itr != end; ++itr) {
+ if (itr->path().stem() == id) {
+ return itr->path().extension().string();
+ }
+ }
+ return "";
+}
+
+P2PVR::RawDataClientPrx
+Storage::OpenForWrite(const std::string & id, const Ice::Current & ice)
+{
+ fs::path path = root / byAll / id;
+ path.replace_extension(FindExtension(id));
+ auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE);
+ if (fd < 0) {
+ Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__,
+ path, errno, strerror(errno));
+ throw P2PVR::StorageException(path.string(), strerror(errno));
+ }
+ auto openFile = OpenFilePtr(new OpenFile(ice.adapter, new FileSink(fd)));
+ openFiles.insert(openFile);
+ return *openFile;
+}
+
+void
+Storage::Close(const P2PVR::RawDataClientPrx & file, const Ice::Current &)
+{
+ openFiles.erase(std::find_if(openFiles.begin(), openFiles.end(), [&file](const OpenFilePtr & of) { return *of == file; }));
+}
+
+void
+Storage::Delete(const std::string & id, const Ice::Current &)
+{
+ fs::path path = root / byAll / id;
+ path.replace_extension(FindExtension(id));
+ Logger()->messagebf(LOG_INFO, "%s: Deleting links to %s", __PRETTY_FUNCTION__, path);
+ DeleteFrom(path, fs::canonical(root));
+}
+
+void
+Storage::DeleteFrom(const fs::path & path, const fs::path & from)
+{
+ Logger()->messagebf(LOG_DEBUG, "%s: Deleting links to %s to %s", __PRETTY_FUNCTION__, path, from);
+ fs::directory_iterator end;
+ for (fs::directory_iterator itr(from); itr != end; ++itr) {
+ if (fs::is_directory(*itr)) {
+ DeleteFrom(path, *itr);
+ }
+ else {
+ boost::system::error_code err;
+ auto link = fs::canonical(*itr, err);
+ if (err || link == path) {
+ Logger()->messagebf(LOG_DEBUG, "%s: deleting %s", __PRETTY_FUNCTION__, *itr);
+ fs::remove(*itr);
+ }
+ }
+ }
+ if (from != root && fs::is_empty(from)) {
+ Logger()->messagebf(LOG_DEBUG, "%s: deleting directory %s", __PRETTY_FUNCTION__, from);
+ fs::remove(from);
+ }
+}
+
diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h
new file mode 100644
index 0000000..093e00d
--- /dev/null
+++ b/p2pvr/daemon/storage.h
@@ -0,0 +1,38 @@
+#ifndef STORAGE_H
+#define STORAGE_H
+
+#include <p2pvr.h>
+#include <options.h>
+#include <string>
+#include <boost/filesystem/path.hpp>
+#include "temporaryIceAdapterObject.h"
+
+class Storage : public P2PVR::Storage {
+ public:
+ std::string CreateForEventRecording(const std::string & ext, const P2PVR::SchedulePtr &, const DVBSI::ServicePtr &, const DVBSI::EventPtr &, const Ice::Current &);
+ P2PVR::RawDataClientPrx OpenForWrite(const std::string &, const Ice::Current &);
+ void Close(const P2PVR::RawDataClientPrx & file, const Ice::Current &);
+ void Delete(const std::string &, const Ice::Current &);
+
+ INITOPTIONS;
+
+ protected:
+ static void createSymlinks(const std::string & ext, const std::string &, const P2PVR::SchedulePtr &, const DVBSI::ServicePtr &, const DVBSI::EventPtr &);
+ static void createSymlink(const std::string & ext, const std::string & target, const boost::filesystem::path & link);
+ static void DeleteFrom(const boost::filesystem::path &, const boost::filesystem::path &);
+ static std::string FindExtension(const std::string & id);
+
+ typedef TemporarayIceAdapterObject<P2PVR::RawDataClient> OpenFile;
+ typedef boost::shared_ptr<OpenFile> OpenFilePtr;
+ typedef std::set<OpenFilePtr> OpenFiles;
+ OpenFiles openFiles;
+
+ static boost::filesystem::path root;
+ static boost::filesystem::path byAll;
+ static boost::filesystem::path byTitle;
+ static boost::filesystem::path byDate;
+ static boost::filesystem::path byService;
+ static boost::filesystem::path bySchedule;
+};
+
+#endif