diff options
author | randomdan <randomdan@localhost> | 2014-03-13 19:42:07 +0000 |
---|---|---|
committer | randomdan <randomdan@localhost> | 2014-03-13 19:42:07 +0000 |
commit | ab1eee942e75874739ce5f0b4ba289aac5cc3faf (patch) | |
tree | 6e43828794fe0c0c5c9921ec1911695b67357c50 /p2pvr/daemon | |
parent | Expose more of the interface (diff) | |
download | p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.bz2 p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.tar.xz p2pvr-ab1eee942e75874739ce5f0b4ba289aac5cc3faf.zip |
Restructure into more sensibly arranged libs
Diffstat (limited to 'p2pvr/daemon')
45 files changed, 2470 insertions, 3 deletions
diff --git a/p2pvr/daemon/Jamfile.jam b/p2pvr/daemon/Jamfile.jam index 8a6ee8c..338f65e 100644 --- a/p2pvr/daemon/Jamfile.jam +++ b/p2pvr/daemon/Jamfile.jam @@ -1,9 +1,20 @@ -lib Ice ; -lib IceUtil ; +cpp-pch pch : pch.hpp : + <library>../ice//p2pvrice + <library>../lib//p2pvrlib + <library>../dvb//p2pvrdvb + <library>..//p2sql + <library>../devices//p2pvrdevices + <library>../daemonbase//p2pvrdaemonbase +; lib p2pvrdaemon : - [ glob *.cpp ] + pch + [ glob-tree *.cpp *.sql ] : <library>../ice//p2pvrice <library>../lib//p2pvrlib + <library>../dvb//p2pvrdvb + <library>..//p2sql + <library>../devices//p2pvrdevices + <library>../daemonbase//p2pvrdaemonbase ; diff --git a/p2pvr/daemon/dbClient.cpp b/p2pvr/daemon/dbClient.cpp new file mode 100644 index 0000000..8267584 --- /dev/null +++ b/p2pvr/daemon/dbClient.cpp @@ -0,0 +1,56 @@ +#include <pch.hpp> +#include "dbClient.h" +#include <sqlMergeTask.h> + +void +DatabaseClient::SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key) +{ + merge->cols.insert(new SqlMergeTask::TargetColumn(name, key)); + if (key) { + merge->keys.insert(name); + } +} + +void +DatabaseClient::onAllDatasources(const DataSourceCall & call) const +{ + BOOST_FOREACH(const CommonObjects::DataSources::value_type & ds, CommonObjects::datasources) { + call(ds.second); + } +} + +DatabaseClient::TxHelper::TxHelper(const DatabaseClient * dbc) : + client(dbc), + so(NULL, + boost::bind(&DatabaseClient::onAllDatasources, dbc, DataSourceCall(boost::bind(&DataSource::commit, _1))), + boost::bind(&DatabaseClient::onAllDatasources, dbc, DataSourceCall(boost::bind(&DataSource::rollback, _1)))) +{ +} + +void +DatabaseClient::TxHelper::Commit() const +{ + client->onAllDatasources(boost::bind(&DataSource::commit, _1)); +} + +DatabaseClient::NoRowsFoundException::NoRowsFoundException() : + std::runtime_error("No rows found") +{ +} + +VariableType +operator/(const DatabaseClient::SelectPtr & cmd, unsigned int col) +{ + HandleAsVariableType vt; + (*cmd)[col].apply(vt); + return vt.variable; +} + +VariableType +operator/(const DatabaseClient::SelectPtr & cmd, const std::string & col) +{ + HandleAsVariableType vt; + (*cmd)[col].apply(vt); + return vt.variable; +} + diff --git a/p2pvr/daemon/dbClient.h b/p2pvr/daemon/dbClient.h new file mode 100644 index 0000000..84cf5e7 --- /dev/null +++ b/p2pvr/daemon/dbClient.h @@ -0,0 +1,103 @@ +#ifndef DBCLIENT_H +#define DBCLIENT_H + +#include <commonObjects.h> +#include <variableType.h> +#include <list> +#include <selectcommand.h> +#include <modifycommand.h> +#include <scopeObject.h> +#include <rdbmsDataSource.h> +#include <sqlVariableBinder.h> +#include <sqlHandleAsVariableType.h> +#include "p2Helpers.h" + +class SqlMergeTask; + +class DatabaseClient : public virtual CommonObjects { + public: + typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; + typedef boost::shared_ptr<DB::ModifyCommand> ModifyPtr; + + static void SqlMergeColumnsInserter(SqlMergeTask * merge, const std::string & name, bool key); + + class TxHelper { + public: + TxHelper(const DatabaseClient *); + void Commit() const; + + private: + const DatabaseClient * client; + ScopeObject so; + }; + + template <typename... Args> + std::pair<RdbmsDataSource::ConnectionRef, ModifyPtr> Modify(const std::string & sql, const Args & ... args) const + { + auto db = dataSource<RdbmsDataSource>("postgres")->getWritable(); + auto cmd = ModifyPtr(db->newModifyCommand(sql)); + Bind(cmd.get(), 0, args...); + return {db, cmd}; + } + + template <typename... Args> + std::pair<RdbmsDataSource::ConnectionRef, SelectPtr> Select(const std::string & sql, const Args & ... args) const + { + auto db = dataSource<RdbmsDataSource>("postgres")->getReadonly(); + auto cmd = SelectPtr(db->newSelectCommand(sql)); + Bind(cmd.get(), 0, args...); + return {db, cmd}; + } + + class NoRowsFoundException : public std::runtime_error { + public: + NoRowsFoundException(); + }; + + template <typename Rtn, typename... Args> + Rtn SelectScalar(const std::string & sql, const Args & ... args) const + { + auto db = dataSource<RdbmsDataSource>("postgres"); + auto cmd = SelectPtr(db->getReadonly()->newSelectCommand(sql)); + Bind(cmd.get(), 0, args...); + while (cmd->fetch()) { + HandleAsVariableType h; + (*cmd)[0].apply(h); + Rtn r; + h.variable >> r; + return r; + } + throw NoRowsFoundException(); + } + + private: + static void Bind(DB::Command *, unsigned int) { } + + template <typename Arg> + static void Bind(DB::Command * cmd, unsigned int offset, const Arg & arg) + { + VariableType v; + v << arg; + boost::apply_visitor<const SqlVariableBinder, const VariableType>(SqlVariableBinder(cmd, offset), v); + } + + template <typename Arg, typename... Args> + static void Bind(DB::Command * cmd, unsigned int offset, const Arg & arg, const Args & ... args) + { + Bind(cmd, offset, arg); + Bind(cmd, offset + 1, args...); + } + + friend class TxHelper; + typedef boost::function<void(DataSourcePtr)> DataSourceCall; + void onAllDatasources(const DataSourceCall &) const; +}; + +VariableType +operator/(const DatabaseClient::SelectPtr & cmd, unsigned int col); + +VariableType +operator/(const DatabaseClient::SelectPtr & cmd, const std::string & col); + +#endif + diff --git a/p2pvr/daemon/globalDevices.cpp b/p2pvr/daemon/globalDevices.cpp new file mode 100644 index 0000000..4368cea --- /dev/null +++ b/p2pvr/daemon/globalDevices.cpp @@ -0,0 +1,86 @@ +#include <pch.hpp> +#include "globalDevices.h" +#include <Ice/Ice.h> + +std::vector<std::string> GlobalDevices::Devices; + +DECLARE_OPTIONS(GlobalDevices, "P2PVR Devices") +("p2pvr.globaldevices.carddaemon", + Options::functions( + [](const VariableType & df) { Devices.push_back(df); }, + []{ Devices.clear(); }), + "ICE address of remote device pools (<adapter>:<endpoint>)") +END_OPTIONS(GlobalDevices); + +P2PVR::TunerPrx +GlobalDevices::GetTunerSpecific(const DVBSI::DeliveryPtr & delivery, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + BOOST_FOREACH(const auto & pool, Devices) { + try { + auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool)); + return poolprx->GetTunerSpecific(delivery); + } + catch (...) { + } + } + throw P2PVR::NoSuitableDeviceAvailable(); +} + +P2PVR::TunerPrx +GlobalDevices::GetTunerAny(short type, const DVBSI::DeliveryPtr & delivery, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + BOOST_FOREACH(const auto & pool, Devices) { + try { + auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool)); + return poolprx->GetTunerAny(type, delivery); + } + catch (...) { + } + } + throw P2PVR::NoSuitableDeviceAvailable(); +} + +P2PVR::PrivateTunerPrx +GlobalDevices::GetPrivateTuner(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + BOOST_FOREACH(const auto & pool, Devices) { + try { + auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool)); + return poolprx->GetPrivateTuner(type); + } + catch (...) { + } + } + throw P2PVR::NoSuitableDeviceAvailable(); +} + +void +GlobalDevices::ReleaseTuner(const P2PVR::TunerPrx & tuner, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + BOOST_FOREACH(const auto & pool, Devices) { + auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool)); + poolprx->ReleaseTuner(tuner); + } +} + +int +GlobalDevices::TunerCount(const Ice::Current & ice) +{ + int total = 0; + auto ic = ice.adapter->getCommunicator(); + BOOST_FOREACH(const auto & pool, Devices) { + try { + auto poolprx = P2PVR::DevicesPrx::checkedCast(ic->stringToProxy(pool)); + total += poolprx->TunerCount(); + } + catch (...) { + // Not available, don't count 'em + } + } + return total; +} + diff --git a/p2pvr/daemon/globalDevices.h b/p2pvr/daemon/globalDevices.h new file mode 100644 index 0000000..da27c95 --- /dev/null +++ b/p2pvr/daemon/globalDevices.h @@ -0,0 +1,25 @@ +#ifndef GLOBALDEVICES_H +#define GLOBALDEVICES_H + +// Global devices implements a device collection (P2PVR::Devices) for any devices known +// throughout the system through other Devices interfaces + +#include <dvb.h> +#include <options.h> + +class GlobalDevices : public P2PVR::Devices { + public: + P2PVR::TunerPrx GetTunerSpecific(const DVBSI::DeliveryPtr &, const Ice::Current &); + P2PVR::TunerPrx GetTunerAny(short type, const DVBSI::DeliveryPtr &, const Ice::Current &); + P2PVR::PrivateTunerPrx GetPrivateTuner(short type, const Ice::Current &); + void ReleaseTuner(const P2PVR::TunerPrx &, const Ice::Current &); + int TunerCount(const Ice::Current &); + + INITOPTIONS; + private: + static std::vector<std::string> Devices; +}; + +#endif + + diff --git a/p2pvr/daemon/maintenance.cpp b/p2pvr/daemon/maintenance.cpp new file mode 100644 index 0000000..3475544 --- /dev/null +++ b/p2pvr/daemon/maintenance.cpp @@ -0,0 +1,97 @@ +#include <pch.hpp> +#include <logger.h> +#include <thread> +#include "maintenance.h" +#include <Ice/Ice.h> +#include "bindTimerTask.h" +#include <linux/dvb/frontend.h> +#include <cxxabi.h> + +time_t Maintenance::periodUpdateNetwork; +time_t Maintenance::periodUpdateServices; +time_t Maintenance::periodUpdateEvents; + +DECLARE_OPTIONS(Maintenance, "P2PVR Maintenance options") +("p2pvr.maintenance.periodUpdateNetwork", Options::value(&periodUpdateNetwork, 86400 * 7), + "Period between automated updates of DVB network (1 week)") +("p2pvr.maintenance.periodUpdateServices", Options::value(&periodUpdateServices, 86400 * 7), + "Period between automated updates of DVB services (1 week)") +("p2pvr.maintenance.periodUpdateEvents", Options::value(&periodUpdateEvents, 3600 * 12), + "Period between automated updates of DVB events (12 hours)") +END_OPTIONS(Maintenance); + +Maintenance::Maintenance(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) : + adapter(a), + timer(t), + clientCheck(new BindTimerTask(boost::bind(&Maintenance::ScheduledUpdate, this))), + lastUpdateNetwork(0), + lastUpdateServices(0), + lastUpdateEvents(0), + updateRunning(false) +{ + timer->scheduleRepeated(clientCheck, IceUtil::Time::seconds(5 * 60)); +#ifdef NDEBUG + ScheduledUpdate(); +#endif +} + +void +Maintenance::UpdateAll(const Ice::Current & ice) +{ + UpdateAll(FE_OFDM, ice); + UpdateAll(FE_QPSK, ice); + UpdateAll(FE_ATSC, ice); + UpdateAll(FE_QAM, ice); +} + +void +Maintenance::UpdateAll(short type, const Ice::Current & ice) +{ + UpdateNetwork(type, ice); + UpdateServices(type, ice); + UpdateProgramAssociations(type, ice); + UpdateProgramMaps(type, ice); + UpdateEvents(type, ice); +} + +void +Maintenance::ScheduledUpdate() +{ + Logger()->messagebf(LOG_DEBUG, "%s: triggered", __PRETTY_FUNCTION__); + if (!updateRunning) { + std::thread update([this] { + try { + ScopeObject notRunning([this]{ updateRunning = false; }); + updateRunning = true; + auto si = P2PVR::MaintenancePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Maintenance"))); + time_t now = time(NULL); + if (lastUpdateNetwork < now - periodUpdateNetwork) { + Logger()->messagebf(LOG_INFO, "%s: updating network", __PRETTY_FUNCTION__); + si->UpdateNetwork(FE_OFDM); + time(&lastUpdateNetwork); + } + if (lastUpdateServices < now - periodUpdateServices) { + Logger()->messagebf(LOG_INFO, "%s: updating services", __PRETTY_FUNCTION__); + si->UpdateServices(FE_OFDM); + time(&lastUpdateServices); + } + if (lastUpdateEvents < now - periodUpdateEvents) { + Logger()->messagebf(LOG_INFO, "%s: updating events", __PRETTY_FUNCTION__); + si->UpdateEvents(FE_OFDM); + time(&lastUpdateEvents); + } + Logger()->messagebf(LOG_DEBUG, "%s: completed", __PRETTY_FUNCTION__); + } + catch (const std::exception & ex) { + char * buf = __cxxabiv1::__cxa_demangle(typeid(ex).name(), NULL, NULL, NULL); + Logger()->messagebf(LOG_ERR, "%s: failed %s: %s", __PRETTY_FUNCTION__, buf, ex.what()); + free(buf); + } + catch (...) { + Logger()->messagebf(LOG_ERR, "%s: failed (unknown exception)", __PRETTY_FUNCTION__); + } + }); + update.detach(); + } +} + diff --git a/p2pvr/daemon/maintenance.h b/p2pvr/daemon/maintenance.h new file mode 100644 index 0000000..563455a --- /dev/null +++ b/p2pvr/daemon/maintenance.h @@ -0,0 +1,39 @@ +#ifndef P2PVR_MAINTENANCE_H +#define P2PVR_MAINTENANCE_H + +#include <p2pvr.h> +#include "dbClient.h" + +class Maintenance : public P2PVR::Maintenance, public DatabaseClient { + public: + Maintenance(Ice::ObjectAdapterPtr, IceUtil::TimerPtr); + + void UpdateAll(const Ice::Current &); + void UpdateAll(short type, const Ice::Current &); + void UpdateNetwork(short type, const Ice::Current &); + void UpdateServices(short type, const Ice::Current &); + void UpdateProgramAssociations(short type, const Ice::Current &); + void UpdateProgramMaps(short type, const Ice::Current &); + void UpdateEvents(short type, const Ice::Current &); + + INITOPTIONS; + + private: + void ScheduledUpdate(); + + Ice::ObjectAdapterPtr adapter; + IceUtil::TimerPtr timer; + IceUtil::TimerTaskPtr clientCheck; + + time_t lastUpdateNetwork; + time_t lastUpdateServices; + time_t lastUpdateEvents; + bool updateRunning; + + static time_t periodUpdateNetwork; + static time_t periodUpdateServices; + static time_t periodUpdateEvents; +}; + +#endif + diff --git a/p2pvr/daemon/maintenance/events.cpp b/p2pvr/daemon/maintenance/events.cpp new file mode 100644 index 0000000..b0c8b74 --- /dev/null +++ b/p2pvr/daemon/maintenance/events.cpp @@ -0,0 +1,92 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/event.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> +#include <commonHelpers.h> + +class SiEventsHandler : public SiEpgParser { + public: + SiEventsHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + void HandleTable(DVBSI::EventPtr e) + { + Logger()->messagebf(LOG_DEBUG, "Service Id: %d Program Id: %d Title: %s Time: %s - %s", + e->ServiceId, e->EventId, e->Title, e->StartTime, e->StopTime); + BindColumns<DVBSI::EventPtr>(rowState, e); + rowState.process(callBack); + } + + private: + ObjectRowState<DVBSI::EventPtr> rowState; + const RowProcessorCallback callBack; +}; + +class SiEventsMerger : public IHaveSubTasks { + public: + SiEventsMerger(short t, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiEventsHandler(boost::bind(&SiEventsMerger::executeChildren, this, ec))); + + auto delivery = si->GetDeliveryForSi(); + if (!delivery) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, delivery); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching events", __PRETTY_FUNCTION__); + tuner->SendEventInformation(parser); + devs->ReleaseTuner(tuner); + } + + private: + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateEvents(short type, const Ice::Current & ice) +{ + TxHelper tx(this); + SqlMergeTask mergeEvents("postgres", "events"); + CreateColumns<DVBSI::EventPtr>(boost::bind(SqlMergeColumnsInserter, &mergeEvents, _1, _2)); + mergeEvents.sources.insert(new SiEventsMerger(type, ice)); + mergeEvents.loadComplete(this); + mergeEvents.execute(NULL); + tx.Commit(); + Logger()->messagebf(LOG_INFO, "%s: Updated events", __PRETTY_FUNCTION__); + + auto ic = ice.adapter->getCommunicator(); + auto sch = P2PVR::SchedulesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("Schedules"))); + sch->DoReschedule(); +} + diff --git a/p2pvr/daemon/maintenance/network.cpp b/p2pvr/daemon/maintenance/network.cpp new file mode 100644 index 0000000..d9ce0ec --- /dev/null +++ b/p2pvr/daemon/maintenance/network.cpp @@ -0,0 +1,107 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/network.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiNetworkInformationMerger : public SiNetworkInformationParser { + public: + SiNetworkInformationMerger(DatabaseClient * co) : commonObjects(co) { } + + bool HandleTable(DVBSI::NetworkPtr n) + { + Logger()->messagebf(LOG_DEBUG, "Network Id: %d Name: %s", n->NetworkId, *n->Name); + BOOST_FOREACH(const auto & ts, n->TransportStreams) { + Logger()->messagebf(LOG_DEBUG, "\tTransport Stream Id: %d Original Network Id: %d", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\t\tService Id: %d Service Type: %d", s.ServiceId, s.ServiceType); + } + if (ts->Terrestrial) { + Logger()->messagebf(LOG_DEBUG, "\t\tDVB-T: Frequency: %d", ts->Terrestrial->Frequency); + } + } + + DatabaseClient::TxHelper tx(commonObjects); + SqlMergeTask mergeNetwork("postgres", "networks"); + CreateColumns<DVBSI::NetworkPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeNetwork, _1, _2)); + std::vector<DVBSI::NetworkPtr> networks = { n }; + mergeNetwork.sources.insert(new ContainerIterator<std::vector<DVBSI::NetworkPtr>>(&networks)); + mergeNetwork.loadComplete(commonObjects); + mergeNetwork.execute(NULL); + + SqlMergeTask mergeTransports("postgres", "transportstreams"); + CreateColumns<DVBSI::NetworkTransportStreamPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeTransports, _1, _2)); + mergeTransports.sources.insert(new ContainerIterator<DVBSI::NetworkTransportStreams>(&n->TransportStreams)); + mergeTransports.loadComplete(commonObjects); + mergeTransports.execute(NULL); + + SqlMergeTask mergeDvbt("postgres", "delivery_dvbt"); + CreateColumns<DVBSI::TerrestrialDeliveryPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeDvbt, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + if (s->Terrestrial) { + mergeDvbt.sources.insert(new SingleIterator<DVBSI::TerrestrialDeliveryPtr>(&s->Terrestrial)); + } + } + mergeDvbt.loadComplete(commonObjects); + mergeDvbt.execute(NULL); + + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::NetworkService>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + BOOST_FOREACH(const auto & s, n->TransportStreams) { + mergeServices.sources.insert(new ContainerIterator<DVBSI::NetworkServiceList>(&s->Services)); + } + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + return false; + } + private: + DatabaseClient * commonObjects; +}; + +void +Maintenance::UpdateNetwork(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + auto siparser = new SiNetworkInformationMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + if (!devs) { + throw std::runtime_error("bad proxy(s)"); + } + auto transport = si->GetDeliveryForSi(); + if (transport) { + P2PVR::TunerPrx tuner; + try { + tuner = devs->GetTunerAny(type, transport); + tuner->SendNetworkInformation(parser); + devs->ReleaseTuner(tuner); + return; + } + catch (const P2PVR::NoSuitableDeviceAvailable &) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to get a suitable tuner", __PRETTY_FUNCTION__); + throw; + } + catch (const std::exception & ex) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information: %s", __PRETTY_FUNCTION__, ex.what()); + devs->ReleaseTuner(tuner); + throw; + } + catch (...) { + Logger()->messagebf(LOG_WARNING, "%s: Failed to fetch network information", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + throw; + } + } + // If we can't do that, do a complete scan + auto tuner = devs->GetPrivateTuner(type); + tuner->ScanAndSendNetworkInformation(parser); + devs->ReleaseTuner(tuner); +} + diff --git a/p2pvr/daemon/maintenance/programAssociations.cpp b/p2pvr/daemon/maintenance/programAssociations.cpp new file mode 100644 index 0000000..ad6438c --- /dev/null +++ b/p2pvr/daemon/maintenance/programAssociations.cpp @@ -0,0 +1,80 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/programAssociation.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <mapIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiProgramAssociationHandler : public SiProgramAssociationParser { + public: + bool HandleTable(ProgramAssociationMapPtr pam) + { + Logger()->messagebf(LOG_DEBUG, "Program association table"); + BOOST_FOREACH(const auto & pa, *pam) { + Logger()->messagebf(LOG_DEBUG, " %d -> %d", pa.first, pa.second); + } + BOOST_FOREACH(const auto & pa, *pam) { + map[pa.first] = pa.second; + } + return false; + } + + ProgramAssociationMap map; +}; + +static +void +CreatePATColumns(const ColumnCreator & cc) +{ + cc("serviceId", true); + cc("programId", false); +} + +void +Maintenance::UpdateProgramAssociations(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiProgramAssociationHandler(); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + BOOST_FOREACH(const auto & transport, deliveries) { + try { + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerSpecific(transport); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramAssociationTable(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); + } + catch (...) { + // Tuning can fail + } + } + + TxHelper tx(this); + SqlMergeTask mergeServices("postgres", "services"); + CreatePATColumns(boost::bind(SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + Columns cols; + mergeServices.sources.insert(new MapIterator<ProgramAssociationMap>(CreatePATColumns, &siparser->map)); + mergeServices.loadComplete(this); + mergeServices.execute(NULL); +} + diff --git a/p2pvr/daemon/maintenance/programMap.cpp b/p2pvr/daemon/maintenance/programMap.cpp new file mode 100644 index 0000000..94d7752 --- /dev/null +++ b/p2pvr/daemon/maintenance/programMap.cpp @@ -0,0 +1,133 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/programMap.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> +#include <rdbmsDataSource.h> +#include <column.h> +#include <selectcommand.h> +#include <sqlHandleAsVariableType.h> + +class SiProgramMapHandler : public SiProgramMapParser { + public: + SiProgramMapHandler(const RowProcessorCallback & cb) : + callBack(cb) {} + + bool HandleTable(DVBSI::ProgramMapPtr pmp) + { + Logger()->messagebf(LOG_DEBUG, "Program map: serviceId = %d", pmp->ServiceId); + BOOST_FOREACH(const auto & s, pmp->Streams) { + Logger()->messagef(LOG_DEBUG, "type: %02x id: %d", s->Type, s->Id); + } + BOOST_FOREACH(const auto & s, pmp->Streams) { + BindColumns<DVBSI::StreamPtr>(rowState, s); + rowState.process(callBack); + } + return false; + } + + private: + ObjectRowState<DVBSI::StreamPtr> rowState; + const RowProcessorCallback callBack; +}; + +typedef boost::shared_ptr<DB::SelectCommand> SelectPtr; + +template<typename T> +void +operator<<(T & val, const DB::Column & col) +{ + HandleAsVariableType havt; + col.apply(havt); + val = havt.variable; +} + +class SiProgramMapMerger : public IHaveSubTasks { + public: + SiProgramMapMerger(short t, CommonObjects * co, const Ice::Current & i) : + SourceObject(__PRETTY_FUNCTION__), + IHaveSubTasks(NULL), + commonObjects(co), + type(t), + ice(i) { } + + void execute(ExecContext * ec) const + { + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, + new SiProgramMapHandler(boost::bind(&SiProgramMapMerger::executeChildren, this, ec))); + + const auto deliveries = si->GetAllDeliveries(type); + if (deliveries.empty()) { + throw std::runtime_error("no delivery methods"); + } + + auto db = commonObjects->dataSource<RdbmsDataSource>("postgres")->getReadonly(); + SelectPtr sel = SelectPtr(db->newSelectCommand("select d.frequency, s.programid \ + from delivery_dvbt d, services s \ + where d.transportstreamid = s.transportstreamid \ + and s.programid is not null \ + order by s.transportstreamid, s.serviceid")); + int64_t curFreq = 0; + P2PVR::TunerPrx tuner; + while (sel->fetch()) { + int64_t freq, pid; + freq << (*sel)[0]; + pid << (*sel)[1]; + + if (freq != curFreq) { + if (tuner) { + devs->ReleaseTuner(tuner); + } + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + const auto transport = *std::find_if(deliveries.begin(), deliveries.end(), + [freq](const DVBSI::DeliveryPtr & del) { return del->Frequency == freq; }); + tuner = devs->GetTunerSpecific(transport); + curFreq = freq; + } + + Logger()->messagebf(LOG_DEBUG, "%s: Fetching associations", __PRETTY_FUNCTION__); + tuner->SendProgramMap(pid, parser); + Logger()->messagebf(LOG_INFO, "%s: Updated associations", __PRETTY_FUNCTION__); + } + if (tuner) { + devs->ReleaseTuner(tuner); + } + } + + private: + CommonObjects * commonObjects; + const short type; + const Ice::Current & ice; + + void executeChildren(ExecContext * ec) const + { + BOOST_FOREACH(const Tasks::value_type & sq, normal) { + sq->execute(ec); + } + } +}; + +void +Maintenance::UpdateProgramMaps(short type, const Ice::Current & ice) +{ + TxHelper tx(this); + SqlMergeTask mergeServiceStreams("postgres", "servicestreams"); + CreateColumns<DVBSI::StreamPtr>(boost::bind(SqlMergeColumnsInserter, &mergeServiceStreams, _1, _2)); + mergeServiceStreams.sources.insert(new SiProgramMapMerger(type, this, ice)); + mergeServiceStreams.loadComplete(this); + mergeServiceStreams.execute(NULL); +} + diff --git a/p2pvr/daemon/maintenance/services.cpp b/p2pvr/daemon/maintenance/services.cpp new file mode 100644 index 0000000..55409c2 --- /dev/null +++ b/p2pvr/daemon/maintenance/services.cpp @@ -0,0 +1,68 @@ +#include <pch.hpp> +#include "../maintenance.h" +#include <siParsers/service.h> +#include <sqlMergeTask.h> +#include <Ice/Communicator.h> +#include <p2Helpers.h> +#include <dvbsiHelpers.h> +#include <containerIterator.h> +#include <singleIterator.h> +#include <temporaryIceAdapterObject.h> + +class SiServicesMerger : public SiServicesParser { + public: + SiServicesMerger(DatabaseClient * co) : commonObjects(co) { } + + bool HandleTable(DVBSI::TransportStreamPtr ts) + { + Logger()->messagebf(LOG_DEBUG, "Transport Stream Id: %d Original Network Id: %s", ts->TransportStreamId, ts->OriginalNetworkId); + BOOST_FOREACH(const auto & s, ts->Services) { + Logger()->messagebf(LOG_DEBUG, "\tService Id: %d Name: %s Type: %d, Provider: %s, DefaultAuthority: %s, RunningStatus %d FreeCaMode %d", + s->ServiceId, (s->Name ? *s->Name : "?"), (s->Type ? *s->Type : -1), + (s->ProviderName ? *s->ProviderName : "?"), (s->DefaultAuthority ? *s->DefaultAuthority : "?"), + s->RunningStatus, s->FreeCaMode); + } + + DatabaseClient::TxHelper tx(commonObjects); + SqlMergeTask mergeServices("postgres", "services"); + CreateColumns<DVBSI::ServicePtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeServices, _1, _2)); + // Don't change the list of services available from the network + mergeServices.doDelete = VariableType(false); + mergeServices.doInsert = VariableType(false); + mergeServices.sources.insert(new ContainerIterator<DVBSI::ServiceList>(&ts->Services)); + mergeServices.loadComplete(commonObjects); + mergeServices.execute(NULL); + return false; + } + + private: + DatabaseClient * commonObjects; +}; + +void +Maintenance::UpdateServices(short type, const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + auto si = P2PVR::SIPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("SI"))); + + if (!devs || !si) { + throw std::runtime_error("bad proxy(s)"); + } + + auto siparser = new SiServicesMerger(this); + TemporarayIceAdapterObject<P2PVR::RawDataClient> parser(ice.adapter, siparser); + + auto delivery = si->GetDeliveryForSi(); + if (!delivery) { + throw std::runtime_error("no delivery methods"); + } + + Logger()->messagebf(LOG_DEBUG, "%s: Getting a tuner", __PRETTY_FUNCTION__); + auto tuner = devs->GetTunerAny(type, delivery); + Logger()->messagebf(LOG_DEBUG, "%s: Fetching service list", __PRETTY_FUNCTION__); + tuner->SendServiceDescriptions(parser); + Logger()->messagebf(LOG_INFO, "%s: Updated service list", __PRETTY_FUNCTION__); + devs->ReleaseTuner(tuner); +} + diff --git a/p2pvr/daemon/pch.hpp b/p2pvr/daemon/pch.hpp new file mode 100644 index 0000000..bf16cef --- /dev/null +++ b/p2pvr/daemon/pch.hpp @@ -0,0 +1,24 @@ +#ifdef BOOST_BUILD_PCH_ENABLED +#ifndef P2PVRLIB_PCH +#define P2PVRLIB_PCH + +#include <Ice/Ice.h> +#include <glibmm.h> +#include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> + +#include <list> +#include <map> +#include <set> +#include <string> +#include <vector> + +#include <variableType.h> +#include <rdbmsDataSource.h> + +#endif +#endif + diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp new file mode 100644 index 0000000..bdd18c6 --- /dev/null +++ b/p2pvr/daemon/recorder.cpp @@ -0,0 +1,100 @@ +#include <pch.hpp> +#include "recorder.h" +#include "bindTimerTask.h" +#include <boost/bind.hpp> +#include <logger.h> +#include <commonHelpers.h> +#include <scopeObject.h> +#include "serviceStreamer.h" +#include "storage.h" +#include "muxer.h" + +std::string Recorder::extension; +std::string Recorder::muxerCommand; + +DECLARE_OPTIONS(Recorder, "P2PVR Recorder options") +("p2pvr.recorder.extension", Options::value(&extension, "mpg"), + "File extension to save with (default: avi)") +("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"), + "File extension to save with (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')") +END_OPTIONS(Recorder); + +Recorder::Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) : + adapter(a), + timer(t) +{ + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + RefreshSchedules(Ice::Current()); +} + +void +Recorder::RefreshSchedules(const Ice::Current &) +{ + std::lock_guard<std::mutex> g(lock); + BOOST_FOREACH(auto & t, pendingRecordings) { + timer->cancel(t); + } + pendingRecordings.clear(); + auto schedules = P2PVR::SchedulesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Schedules"))); + auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI"))); + BOOST_FOREACH(const auto & s, schedules->GetScheduledToRecord()) { + if (std::find_if(currentRecordings.begin(), currentRecordings.end(), [&s](const CurrentPtr & c) { + return s->ScheduleId == c->schedule->ScheduleId && s->ServiceId == c->event->ServiceId && s->EventId == c->event->EventId;; + }) != currentRecordings.end()) { + continue; + } + auto schedule = schedules->GetSchedule(s->ScheduleId); + auto service = si->GetService(s->ServiceId); + auto event = si->GetEvent(s->ServiceId, s->EventId); + + auto startIn = std::max<time_t>((*event->StartTime - *schedule->Early - boost::posix_time::second_clock::universal_time()).total_seconds(), 0); + IceUtil::TimerTaskPtr startTimer = new BindTimerTask(boost::bind(&Recorder::StartRecording, this, schedule, service, event)); + timer->schedule(startTimer, IceUtil::Time::seconds(startIn)); + pendingRecordings.push_back(startTimer); + Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled for %s seconds", event->Title, startIn); + } +} + +void +Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event) +{ + std::lock_guard<std::mutex> g(lock); + auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); + auto recordings = P2PVR::RecordingsPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Recordings"))); + + auto id = storage->CreateForEventRecording(extension, schedule, service, event); + auto store = storage->OpenForWrite(id); + ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); + auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); + ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); }); + auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter)); + + ss->Start(); + Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)", + event->Title, event->StartTime, event->StopTime, + service->Name ? *service->Name : "<no name>", service->ServiceId); + + recordings->NewRecording(new P2PVR::Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->Title, event->Subtitle, + event->Description, event->StartTime, *(*event->StopTime - *event->StartTime))); + auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()})); + currentRecordings.insert(newCurrent); + + auto stopIn = (*event->StopTime + *schedule->Late - boost::posix_time::second_clock::universal_time()).total_seconds(); + newCurrent->stopTimer = new BindTimerTask(boost::bind(&Recorder::StopRecording, this, newCurrent)); + timer->schedule(newCurrent->stopTimer, IceUtil::Time::seconds(stopIn)); + Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled stop in %s seconds", event->Title, stopIn); +} + +void +Recorder::StopRecording(CurrentPtr c) +{ + std::lock_guard<std::mutex> g(lock); + Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title); + c->stream->Stop(); + adapter->remove(c->muxer->ice_getIdentity()); + currentRecordings.erase(c); + auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); + storage->Close(c->store); + Logger()->messagebf(LOG_DEBUG, "Stopped %s", c->event->Title); +} + diff --git a/p2pvr/daemon/recorder.h b/p2pvr/daemon/recorder.h new file mode 100644 index 0000000..78c604b --- /dev/null +++ b/p2pvr/daemon/recorder.h @@ -0,0 +1,51 @@ +#ifndef RECORDER_H +#define RECORDER_H + +#include <IceUtil/Timer.h> +#include <Ice/ObjectAdapter.h> +#include <options.h> +#include <p2pvr.h> +#include <mutex> +#include "serviceStreamer.h" + +class Recorder : public P2PVR::Recorder { + public: + typedef std::vector<IceUtil::TimerTaskPtr> Pendings; + + class Current { + public: + P2PVR::RawDataClientPrx muxer; + P2PVR::RawDataClientPrx store; + ServiceStreamerPtr stream; + P2PVR::SchedulePtr schedule; + DVBSI::ServicePtr service; + DVBSI::EventPtr event; + IceUtil::TimerTaskPtr stopTimer; + }; + typedef boost::shared_ptr<Current> CurrentPtr; + typedef std::set<CurrentPtr> Currents; + + Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr); + + void RefreshSchedules(const Ice::Current &); + + INITOPTIONS; + + private: + void StartRecordings(); + void StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event); + void StopRecording(CurrentPtr); + + Ice::ObjectAdapterPtr adapter; + IceUtil::TimerPtr timer; + + Currents currentRecordings; + Pendings pendingRecordings; + std::mutex lock; + + static std::string extension; + static std::string muxerCommand; +}; + +#endif + diff --git a/p2pvr/daemon/recordings.cpp b/p2pvr/daemon/recordings.cpp new file mode 100644 index 0000000..bad8032 --- /dev/null +++ b/p2pvr/daemon/recordings.cpp @@ -0,0 +1,82 @@ +#include <pch.hpp> +#include "recordings.h" +#include "resources.h" +#include <Ice/Ice.h> +#include <logger.h> +#include "sqlContainerCreator.h" + +ResourceString(Recording_Insert, daemon_sql_Recordings_insert_sql); +ResourceString(Recording_InsertNewId, daemon_sql_Recordings_insertNewId_sql); +ResourceString(Recording_Delete, daemon_sql_Recordings_delete_sql); +ResourceString(Recording_GetStorage, daemon_sql_Recordings_getStorage_sql); +ResourceString(Recording_GetAll, daemon_sql_Recordings_getAll_sql); + +template<> +void +CreateColumns<P2PVR::RecordingPtr>(const ColumnCreator & cc) +{ + cc("recordingid", true); + cc("storageaddress", false); + cc("guid", false); + cc("scheduleid", false); + cc("title", false); + cc("subtitle", false); + cc("description", false); + cc("starttime", false); + cc("duration", false); +} + +template<> +void +UnbindColumns(RowState & rs, const P2PVR::RecordingPtr & r) +{ + rs.fields[0] >> r->RecordingId; + rs.fields[1] >> r->StorageAddress; + rs.fields[2] >> r->Guid; + rs.fields[3] >> r->ScheduleId; + rs.fields[4] >> r->Title; + rs.fields[5] >> r->Subtitle; + rs.fields[6] >> r->Description; + rs.fields[7] >> r->StartTime; + rs.fields[8] >> r->Duration; +} + +int +Recordings::NewRecording(const P2PVR::RecordingPtr & r, const Ice::Current &) +{ + Logger()->messagebf(LOG_INFO, "%s: Creating new recording %s at %s", __PRETTY_FUNCTION__, r->Guid, r->StorageAddress); + TxHelper tx(this); + auto insert = Modify(Recording_Insert, + r->StorageAddress, r->Guid, r->ScheduleId, r->Title, r->Subtitle, r->Description, r->StartTime, r->Duration); + insert.second->execute(); + r->RecordingId = SelectScalar<int>(Recording_InsertNewId); + Logger()->messagebf(LOG_INFO, "%s: Created recording Id: %d", __PRETTY_FUNCTION__, r->RecordingId); + return r->RecordingId; +} + +void +Recordings::DeleteRecording(int id, const Ice::Current & ice) +{ + Logger()->messagebf(LOG_INFO, "%s: Deleting recording Id: %d", __PRETTY_FUNCTION__, id); + auto ic = ice.adapter->getCommunicator(); + TxHelper tx(this); + auto recordingStorages = Select(Recording_GetStorage, id); + while (recordingStorages.second->fetch()) { + std::string addr = recordingStorages.second / "storageaddress"; + std::string guid = recordingStorages.second / "guid"; + auto storage = P2PVR::StoragePrx::checkedCast(ic->stringToProxy(addr)); + storage->Delete(guid); + Logger()->messagebf(LOG_DEBUG, "%s: Delete %s from StorageAddress %s", __PRETTY_FUNCTION__, guid, addr); + } + Modify(Recording_Delete, id).second->execute(); +} + +P2PVR::RecordingList +Recordings::GetRecordings(const Ice::Current &) +{ + P2PVR::RecordingList rtn; + SqlContainerCreator<P2PVR::RecordingList, P2PVR::Recording> cc(rtn); + cc.populate(Select(Recording_GetAll).second); + return rtn; +} + diff --git a/p2pvr/daemon/recordings.h b/p2pvr/daemon/recordings.h new file mode 100644 index 0000000..0f18a36 --- /dev/null +++ b/p2pvr/daemon/recordings.h @@ -0,0 +1,16 @@ +#ifndef RECORDINGS_H +#define RECORDINGS_H + +#include <p2pvr.h> +#include <string> +#include "dbClient.h" + +class Recordings : public DatabaseClient, public P2PVR::Recordings { + public: + int NewRecording(const P2PVR::RecordingPtr & rec, const Ice::Current &); + void DeleteRecording(int recordingId, const Ice::Current &); + P2PVR::RecordingList GetRecordings(const Ice::Current &); +}; + +#endif + diff --git a/p2pvr/daemon/schedulers/bitDumbScheduler.cpp b/p2pvr/daemon/schedulers/bitDumbScheduler.cpp new file mode 100644 index 0000000..f3cf5fb --- /dev/null +++ b/p2pvr/daemon/schedulers/bitDumbScheduler.cpp @@ -0,0 +1,55 @@ +#include <pch.hpp> +#include "../schedules.h" + +class TheBitDumbScheduler : public EpisodeGroup { + public: + TheBitDumbScheduler(const Episodes & eps) : + episodes(eps) + { + } + + protected: + void SelectShowings() + { + SelectShowings(episodes.begin(), 1); + } + + private: + void SelectShowings(Episodes::const_iterator e, uint64_t complexityLevel) + { + if (e != episodes.end()) { + complexityLevel *= (*e)->showings.size(); + if (complexityLevel > 2 << 16) { + auto current = showings.size(); + for (EpisodesIter ne = e; ne != episodes.end(); ne++) { + BOOST_FOREACH(const auto & s, (*ne)->showings) { + showings.push_back(s); + if (SuggestWithFeedback(showings) & 0x1) { + break; + } + showings.pop_back(); + } + } + showings.resize(current); + } + else { + EpisodesIter ne = e; + ne++; + BOOST_FOREACH(const auto & s, (*e)->showings) { + showings.push_back(s); + SelectShowings(ne, complexityLevel); + showings.pop_back(); + } + } + } + else { + Suggest(showings); + } + } + + mutable Showings showings; // working set + const Episodes episodes; +}; + +DECLARE_GENERIC_LOADER("BitDumb", EpisodeGroupLoader, TheBitDumbScheduler); + diff --git a/p2pvr/daemon/schedules.cpp b/p2pvr/daemon/schedules.cpp new file mode 100644 index 0000000..90df59c --- /dev/null +++ b/p2pvr/daemon/schedules.cpp @@ -0,0 +1,484 @@ +#include <pch.hpp> +#include "schedules.h" +#include "sqlContainerCreator.h" +#include <rdbmsDataSource.h> +#include <logger.h> +#include <Ice/Ice.h> +#include <sqlVariableBinder.h> +#include <sqlMergeTask.h> +#include "p2Helpers.h" +#include "containerIterator.h" +#include "resources.h" +#include <boost/date_time/posix_time/posix_time.hpp> +#include <instanceStore.impl.h> + +ResourceString(Schedules_GetCandidates, daemon_sql_Schedules_GetCandidates_sql); +ResourceString(Schedules_insert, daemon_sql_Schedules_insert_sql); +ResourceString(Schedules_insertNewId, daemon_sql_Schedules_insertNewId_sql); +ResourceString(Schedules_update, daemon_sql_Schedules_update_sql); +ResourceString(Schedules_delete, daemon_sql_Schedules_delete_sql); +ResourceString(Schedules_selectAll, daemon_sql_Schedules_selectAll_sql); +ResourceString(Schedules_selectById, daemon_sql_Schedules_selectById_sql); +ResourceString(Schedules_scheduledToRecord, daemon_sql_Schedules_scheduledToRecord_sql); + +std::string Schedules::SchedulerAlgorithm; + +DECLARE_OPTIONS(Schedules, "P2PVR Scheduler options") +("p2pvr.scheduler.algorithm", Options::value(&SchedulerAlgorithm, "BitDumb"), + "Implementation of episode group scheduler problem solver") +END_OPTIONS() + +class ScheduleCandidate { + public: + std::string What; + int ServiceId; + int EventId; + int TransportStreamId; + datetime StartTime; + datetime StopTime; + int Priority; + int ScheduleId; +}; +typedef boost::shared_ptr<ScheduleCandidate> ScheduleCandidatePtr; +typedef std::vector<ScheduleCandidatePtr> ScheduleCandidates; + +enum RecordStatuses { + Record_WillRecordThisShowing = 0, + Record_WillRecordOtherShowing = 1, + Record_CannotRecordAnyShowing = 2 +}; + +class Record { + public: + Record() { }; + Record(int s, int e, RecordStatuses rs, int sc) : + ServiceId(s), + EventId(e), + RecordStatus(rs), + ScheduleId(sc) + { + } + + int ServiceId; + int EventId; + RecordStatuses RecordStatus; + int ScheduleId; +}; +typedef boost::shared_ptr<Record> RecordPtr; +typedef std::vector<RecordPtr> Records; + +template<> +void +CreateColumns<P2PVR::ScheduledToRecordPtr>(const ColumnCreator & cc) +{ + cc("serviceid", true); + cc("eventid", true); + cc("scheduleid", true); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s) +{ + rs.fields[0] >> s->ServiceId; + rs.fields[1] >> s->EventId; + rs.fields[2] >> s->ScheduleId; +} + +template<> +void +CreateColumns<ScheduleCandidatePtr>(const ColumnCreator & cc) +{ + cc("what", true); + cc("serviceid", false); + cc("eventid", false); + cc("transportstreamid", false); + cc("starttime", false); + cc("stoptime", false); + cc("priority", false); + cc("scheduleid", false); +} + +template<> +void +UnbindColumns(RowState & rs, ScheduleCandidatePtr const & s) +{ + rs.fields[0] >> s->What; + rs.fields[1] >> s->ServiceId; + rs.fields[2] >> s->EventId; + rs.fields[3] >> s->TransportStreamId; + rs.fields[4] >> s->StartTime; + rs.fields[5] >> s->StopTime; + rs.fields[6] >> s->Priority; + rs.fields[7] >> s->ScheduleId; +} + +template<> +void +CreateColumns<P2PVR::SchedulePtr>(const ColumnCreator & cc) +{ + cc("scheduleid", true); + cc("serviceid", false); + cc("eventid", false); + cc("title", false); + cc("search", false); + cc("priority", false); + cc("early", false); + cc("late", false); + cc("repeats", false); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::SchedulePtr const & s) +{ + rs.fields[0] >> s->ScheduleId; + rs.fields[1] >> s->ServiceId; + rs.fields[2] >> s->EventId; + rs.fields[3] >> s->Title; + rs.fields[4] >> s->Search; + rs.fields[5] >> s->Priority; + rs.fields[6] >> s->Early; + rs.fields[7] >> s->Late; + rs.fields[8] >> s->Repeats; +} + +template<> +void +CreateColumns<RecordPtr>(const ColumnCreator & cc) +{ + cc("serviceid", true); + cc("eventid", true); + cc("recordstatus", false); + cc("scheduleid", false); +} + +template<> +void +BindColumns(RowState & rs, RecordPtr const & s) +{ + rs.fields[0] << s->ServiceId; + rs.fields[1] << s->EventId; + rs.fields[2] << (int)s->RecordStatus; + rs.fields[3] << s->ScheduleId; +} + +Showing::Showing(unsigned int s, unsigned int e, unsigned int t, unsigned int sc, datetime start, datetime stop, int p, const Episode * ep) : + episode(ep), + serviceId(s), + eventId(e), + priority(p), + scheduleId(sc), + transportStreamId(t), + startTime(start), + stopTime(stop), + period(start, stop) +{ +} + +Episode::Episode(const std::string & w) : + priority(0), + what(w) +{ +} + +EpisodeGroup::EpisodeGroup() : + tuners(1), + sumTimeToStart(0), + score(0) +{ +} + +bool +EpisodeGroup::IsShowingListValid(const Showings & showings) const +{ + struct TransOffset { + unsigned int trans; + char offset; + }; + typedef std::multimap<datetime, TransOffset> Periods; + typedef std::map<int, unsigned char> Usage; + + Periods periods; + Usage usage; + + BOOST_FOREACH(const auto & s, showings) { + if (s) { + periods.insert(Periods::value_type(s->startTime, {s->transportStreamId, 1})); + periods.insert(Periods::value_type(s->stopTime, {s->transportStreamId, -1})); + } + } + bool result = true; + BOOST_FOREACH(const auto & p, periods) { + auto & u = usage[p.second.trans]; + u += p.second.offset; + if (std::count_if(usage.begin(), usage.end(), [](const Usage::value_type & uv) { return uv.second > 0;}) > tuners) { + result = false; + break; + } + } + periods.clear(); + usage.clear(); + return result; +} + +template<typename T> +inline +bool NotNull(const T & p) +{ + return p != NULL; +} + +class SumTimeToStart { + public: + SumTimeToStart(time_t & t) : + total(t), + now(boost::posix_time::second_clock::universal_time()) + { + total = 0; + } + inline void operator()(const ShowingPtr & s) const + { + if (s) { + total += std::min<time_t>((now - s->startTime).seconds(), 0); + } + } + public: + time_t & total; + datetime now; +}; + +const Showings & +EpisodeGroup::Solve() +{ + SelectShowings(); + return selected; +} + +void +EpisodeGroup::Suggest(const Showings & showings) +{ + unsigned int c = 0; + std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; }); + if (c >= score) { + time_t stt; + std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt)); + if (stt < sumTimeToStart || (stt == sumTimeToStart && c > score)) { + if (IsShowingListValid(showings)) { + selected = showings; + score = c; + sumTimeToStart = stt; + } + } + } +} + +EpisodeGroup::SuggestionResult +EpisodeGroup::SuggestWithFeedback(const Showings & showings) +{ + if (IsShowingListValid(showings)) { + unsigned int c = 0; + std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; }); + if (c >= score) { + time_t stt; + std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt)); + if (stt < sumTimeToStart) { + selected = showings; + score = c; + sumTimeToStart = stt; + return SuggestionValidAndAccepted; + } + } + return SuggestionValid; + } + else { + return SuggestionInvalid; + } +} + +void +Schedules::GetEpisodeIntersects(Episodes & all, Episodes & grouped) +{ + for (Episodes::iterator aei = all.begin(); aei != all.end(); aei++) { + const auto & ae = *aei; + BOOST_FOREACH(const auto & ge, grouped) { + BOOST_FOREACH(const auto & gs, ge->showings) { + BOOST_FOREACH(const auto & as, ae->showings) { + if (gs->period.intersects(as->period)) { + Logger()->messagebf(LOG_DEBUG, " added %s", ae->what); + grouped.push_back(ae); + all.erase(aei); + GetEpisodeIntersects(all, grouped); + return; + } + } + } + } + } +} + +void +Schedules::DoReschedule(const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + unsigned int tunerCount = devs->TunerCount(); + + // Load list from database + ScheduleCandidates episodes; + SqlContainerCreator<ScheduleCandidates, ScheduleCandidate, ScheduleCandidatePtr> cct(episodes); + cct.populate(Select(Schedules_GetCandidates).second); + + Episodes scheduleList; + Showings allShowings; + EpisodePtr cur; + int minPriority = 0; + BOOST_FOREACH(const auto & c, episodes) { + if (!cur || cur->what != c->What) { + cur = new Episode(c->What); + scheduleList.push_back(cur); + } + ShowingPtr s = new Showing(c->ServiceId, c->EventId, c->TransportStreamId, c->ScheduleId, + c->StartTime, c->StopTime, c->Priority, cur.get()); + minPriority = std::min(minPriority, s->priority); + cur->showings.push_back(s); + allShowings.push_back(s); + } + Logger()->messagebf(LOG_DEBUG, "%d episodes created, %s showings", scheduleList.size(), allShowings.size()); + BOOST_FOREACH(const auto & e, scheduleList) { + Logger()->messagebf(LOG_DEBUG, " %s", e->what); + BOOST_FOREACH(const auto & s, e->showings) { + s->priority += 1 - minPriority; + e->priority += s->priority; + } + e->priority /= e->showings.size(); + } + + Records records; + // Solve + while (!scheduleList.empty()) { + auto work = scheduleList.begin(); + Logger()->messagebf(LOG_DEBUG, "start %s", (*work)->what); + Episodes group; + group.push_back(*work); + scheduleList.erase(work); + GetEpisodeIntersects(scheduleList, group); + std::sort(group.begin(), group.end(), [](const EpisodePtr & a, const EpisodePtr & b) { + if (a->priority > b->priority) return true; + if (a->priority < b->priority) return false; + return a->what < b->what; + }); + + Logger()->messagebf(LOG_DEBUG, "group created with %d episodes", group.size()); + double total = 1; + // Measure and add the optional to not record + BOOST_FOREACH(const auto & e, group) { + Logger()->messagebf(LOG_DEBUG, " %d * %d:%s", e->showings.size(), e->priority, e->what); + e->showings.push_back(NULL); + total *= e->showings.size(); + } + Logger()->messagebf(LOG_DEBUG, "group complexity of %d options", total); + + EpisodeGroupPtr sched = EpisodeGroupPtr(EpisodeGroupLoader::createNew(SchedulerAlgorithm, group)); + sched->tuners = tunerCount; + std::set<ShowingPtr> selected; + BOOST_FOREACH(const auto & s, sched->Solve()) { + if (s) selected.insert(s); + } + + BOOST_FOREACH(const auto & c, group) { + Logger()->messagebf(LOG_DEBUG, "Episode %s, %d options", c->what, c->showings.size()); + BOOST_FOREACH(const auto & i, c->showings) { + if (selected.find(i) != selected.end()) { + Logger()->messagebf(LOG_DEBUG, " %s - %s (%d) <-", i->startTime, i->stopTime, i->transportStreamId); + } + else if (i) { + Logger()->messagebf(LOG_DEBUG, " %s - %s (%d)", i->startTime, i->stopTime, i->transportStreamId); + } + } + } + Logger()->message(LOG_DEBUG, "----------"); + BOOST_FOREACH(const auto & c, group) { + bool found = false; + BOOST_FOREACH(const auto & i, c->showings) { + if (i && selected.find(i) != selected.end()) { + found = true; + break; + } + } + BOOST_FOREACH(const auto & i, c->showings) { + if (i) { + records.push_back(RecordPtr(new Record(i->serviceId, i->eventId, + found ? + selected.find(i) != selected.end() ? Record_WillRecordThisShowing : Record_WillRecordOtherShowing : + Record_CannotRecordAnyShowing, i->scheduleId))); + } + } + } + } + + TxHelper tx(this); + SqlMergeTask mergeRecords("postgres", "record"); + CreateColumns<RecordPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeRecords, _1, _2)); + mergeRecords.sources.insert(new ContainerIterator<Records>(&records)); + mergeRecords.loadComplete(this); + mergeRecords.execute(NULL); + tx.Commit(); + + auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder"))); + recorder->RefreshSchedules(); +} + +void +Schedules::DeleteSchedule(int id, const Ice::Current & ice) +{ + TxHelper tx(this); + Modify(Schedules_delete, id).second->execute(); + DoReschedule(ice); +} + +P2PVR::ScheduleList +Schedules::GetSchedules(const Ice::Current &) +{ + P2PVR::ScheduleList schedules; + SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules); + cct.populate(Select(Schedules_selectAll).second); + return schedules; +} + +P2PVR::SchedulePtr +Schedules::GetSchedule(int id, const Ice::Current &) +{ + P2PVR::ScheduleList schedules; + SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules); + cct.populate(Select(Schedules_selectById, id).second); + if (schedules.empty()) throw P2PVR::NotFound(); + return schedules.front(); +} + +P2PVR::ScheduledToRecordList +Schedules::GetScheduledToRecord(const Ice::Current &) +{ + P2PVR::ScheduledToRecordList scheduled; + SqlContainerCreator<P2PVR::ScheduledToRecordList, P2PVR::ScheduledToRecord> cct(scheduled); + cct.populate(Select(Schedules_scheduledToRecord).second); + return scheduled; +} + +int +Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice) +{ + TxHelper tx(this); + if (s->ScheduleId == 0) { + Modify(Schedules_insert, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats).second->execute(); + s->ScheduleId = SelectScalar<int>(Schedules_insertNewId); + } + else { + Modify(Schedules_update, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats, s->ScheduleId).second->execute(); + } + DoReschedule(ice); + return s->ScheduleId; +} + +INSTANTIATESTORE(std::string, EpisodeGroupLoader); + diff --git a/p2pvr/daemon/schedules.h b/p2pvr/daemon/schedules.h new file mode 100644 index 0000000..f7075dd --- /dev/null +++ b/p2pvr/daemon/schedules.h @@ -0,0 +1,85 @@ +#ifndef SCHEDULER_H +#define SCHEDULER_H + +#include <p2pvr.h> +#include <options.h> +#include "dbClient.h" +#include <genLoader.h> + +typedef boost::posix_time::ptime datetime; +class Episode; + +class Showing : public IntrusivePtrBase { + public: + Showing(unsigned int s, unsigned int e, unsigned int t, unsigned int sc, datetime start, datetime stop, int p, const Episode * ep); + // Record what? + const Episode * episode; + const unsigned int serviceId; + const unsigned int eventId; + int priority; + const unsigned int scheduleId; + // Requires + const unsigned int transportStreamId; + const datetime startTime; + const datetime stopTime; + const boost::posix_time::time_period period; +}; +typedef boost::intrusive_ptr<Showing> ShowingPtr; +typedef std::vector<ShowingPtr> Showings; +typedef Showings::const_iterator ShowingsIter; + +class Episode : public IntrusivePtrBase { + public: + Episode(const std::string & w); + int priority; + const std::string what; + Showings showings; +}; +typedef boost::intrusive_ptr<Episode> EpisodePtr; +typedef std::vector<EpisodePtr> Episodes; +typedef Episodes::const_iterator EpisodesIter; + +class EpisodeGroup { + public: + EpisodeGroup(); + + const Showings & Solve(); + unsigned int tuners; + + protected: + enum SuggestionResult { + SuggestionInvalid = 0, SuggestionValid = 1, SuggestionValidAndAccepted = 3 + }; + SuggestionResult SuggestWithFeedback(const Showings &); + void Suggest(const Showings &); + virtual void SelectShowings() = 0; + + private: + bool IsShowingListValid(const Showings & showings) const; + // chosen set + time_t sumTimeToStart; + unsigned int score; + Showings selected; +}; + +class Schedules : public P2PVR::Schedules, public DatabaseClient { + public: + void DeleteSchedule(int id, const Ice::Current &); + P2PVR::SchedulePtr GetSchedule(int id, const Ice::Current &); + P2PVR::ScheduleList GetSchedules(const Ice::Current &); + P2PVR::ScheduledToRecordList GetScheduledToRecord(const Ice::Current &); + int UpdateSchedule(const P2PVR::SchedulePtr &, const Ice::Current &); + void DoReschedule(const Ice::Current &); + + INITOPTIONS; + protected: + static void GetEpisodeIntersects(Episodes &, Episodes &); + private: + static std::string SchedulerAlgorithm; +}; + +typedef GenLoader<EpisodeGroup, std::string, const Episodes &> EpisodeGroupLoader; +typedef boost::shared_ptr<EpisodeGroup> EpisodeGroupPtr; + +#endif + diff --git a/p2pvr/daemon/si.cpp b/p2pvr/daemon/si.cpp new file mode 100644 index 0000000..557003c --- /dev/null +++ b/p2pvr/daemon/si.cpp @@ -0,0 +1,127 @@ +#include <pch.hpp> +#include "si.h" +#include "resources.h" +#include "dvbsiHelpers.h" +#include "sqlContainerCreator.h" +#include <linux/dvb/frontend.h> +#include <logger.h> + +ResourceString(SI_serviceNextUsed, daemon_sql_SI_serviceNextUsed_sql); +ResourceString(SI_servicesSelectAll, daemon_sql_SI_servicesSelectAll_sql); +ResourceString(SI_servicesSelectById, daemon_sql_SI_servicesSelectById_sql); +ResourceString(SI_eventById, daemon_sql_SI_eventById_sql); +ResourceString(SI_eventsOnNow, daemon_sql_SI_eventsOnNow_sql); +ResourceString(SI_eventsInRange, daemon_sql_SI_eventsInRange_sql); + +P2PVR::Deliveries +SI::GetAllDeliveries(short type, const Ice::Current &) +{ + Logger()->messagebf(LOG_DEBUG, "%s(type %d)", __PRETTY_FUNCTION__, type); + P2PVR::Deliveries rtn; + SelectPtr sel; + switch (type) { + case FE_OFDM: + { + SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cc(rtn); + cc.populate(Select("SELECT * FROM delivery_dvbt ORDER BY transportStreamId").second); + break; + } + case FE_QAM: + { + SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> cc(rtn); + cc.populate(Select("SELECT * FROM delivery_dvbc ORDER BY transportStreamId").second); + break; + } + case FE_QPSK: + { + SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> cc(rtn); + cc.populate(Select("SELECT * FROM delivery_dvbs ORDER BY transportStreamId").second); + break; + } + } + Logger()->messagebf(LOG_DEBUG, "%s: Found %d delivery methods", __PRETTY_FUNCTION__, rtn.size()); + return rtn; +} + +DVBSI::DeliveryPtr +SI::GetDeliveryForTransport(int id, const Ice::Current&) +{ + P2PVR::Deliveries rtn; + SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn); + cct.populate(Select("SELECT * FROM delivery_dvbt WHERE transportStreamId = ?", id).second); + SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn); + ccc.populate(Select("SELECT * FROM delivery_dvbc WHERE transportStreamId = ?", id).second); + SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn); + ccs.populate(Select("SELECT * FROM delivery_dvbs WHERE transportStreamId = ?", id).second); + return rtn.front(); +} + +DVBSI::DeliveryPtr +SI::GetDeliveryForSi(const Ice::Current&) +{ + P2PVR::Deliveries rtn; + SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn); + cct.populate(Select(SI_serviceNextUsed).second); + return rtn.front(); +} + +DVBSI::DeliveryPtr +SI::GetDeliveryForService(int id, const Ice::Current&) +{ + P2PVR::Deliveries rtn; + SqlContainerCreator<P2PVR::Deliveries, DVBSI::TerrestrialDelivery> cct(rtn); + cct.populate(Select("SELECT d.* FROM services s, delivery_dvbt d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second); + SqlContainerCreator<P2PVR::Deliveries, DVBSI::CableDelivery> ccc(rtn); + ccc.populate(Select("SELECT d.* FROM services s, delivery_dvbc d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second); + SqlContainerCreator<P2PVR::Deliveries, DVBSI::SatelliteDelivery> ccs(rtn); + ccs.populate(Select("SELECT d.* FROM services s, delivery_dvbs d WHERE serviceid = ? AND s.transportstreamid = d.transportstreamid", id).second); + return rtn.front(); +} + +DVBSI::ServiceList +SI::GetServices(const Ice::Current&) +{ + DVBSI::ServiceList rtn; + SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn); + cc.populate(Select(SI_servicesSelectAll).second); + return rtn; +} + +DVBSI::ServicePtr +SI::GetService(int id, const Ice::Current&) +{ + DVBSI::ServiceList rtn; + SqlContainerCreator<DVBSI::ServiceList, DVBSI::Service> cc(rtn); + cc.populate(Select(SI_servicesSelectById, id).second); + if (rtn.empty()) throw P2PVR::NotFound(); + return rtn.front(); +} + +DVBSI::EventPtr +SI::GetEvent(int serviceId, int eventId, const Ice::Current &) +{ + DVBSI::Events rtn; + SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn); + cc.populate(Select(SI_eventById, serviceId, eventId).second); + if (rtn.empty()) throw P2PVR::NotFound(); + return rtn.front(); +} + +DVBSI::Events +SI::EventsOnNow(const Ice::Current &) +{ + DVBSI::Events rtn; + SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn); + cc.populate(Select(SI_eventsOnNow).second); + return rtn; +} + +DVBSI::Events +SI::EventsInRange(const Common::DateTime & from, const Common::DateTime & to, const Ice::Current &) +{ + DVBSI::Events rtn; + SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn); + cc.populate(Select(SI_eventsInRange, from, to).second); + return rtn; +} + diff --git a/p2pvr/daemon/si.h b/p2pvr/daemon/si.h new file mode 100644 index 0000000..6b720dc --- /dev/null +++ b/p2pvr/daemon/si.h @@ -0,0 +1,23 @@ +#ifndef P2PVR_SI_H +#define P2PVR_SI_H + +#include <p2pvr.h> +#include "dbClient.h" + +class SI : public P2PVR::SI, public DatabaseClient { + public: + P2PVR::Deliveries GetAllDeliveries(short type, const Ice::Current &); + DVBSI::DeliveryPtr GetDeliveryForService(int id, const Ice::Current &); + DVBSI::DeliveryPtr GetDeliveryForTransport(int id, const Ice::Current &); + DVBSI::DeliveryPtr GetDeliveryForSi(const Ice::Current &); + + DVBSI::ServiceList GetServices(const Ice::Current &); + DVBSI::ServicePtr GetService(int id, const Ice::Current &); + + DVBSI::EventPtr GetEvent(int serviceId, int eventId, const Ice::Current &); + DVBSI::Events EventsOnNow(const Ice::Current &); + DVBSI::Events EventsInRange(const Common::DateTime &, const Common::DateTime &, const Ice::Current &); +}; + +#endif + diff --git a/p2pvr/daemon/sql/Recordings_delete.sql b/p2pvr/daemon/sql/Recordings_delete.sql new file mode 100644 index 0000000..3395376 --- /dev/null +++ b/p2pvr/daemon/sql/Recordings_delete.sql @@ -0,0 +1,2 @@ +DELETE FROM recordings +WHERE recordingId = ? diff --git a/p2pvr/daemon/sql/Recordings_getAll.sql b/p2pvr/daemon/sql/Recordings_getAll.sql new file mode 100644 index 0000000..c0a096c --- /dev/null +++ b/p2pvr/daemon/sql/Recordings_getAll.sql @@ -0,0 +1,3 @@ +SELECT recordingId, storageAddress, guid, scheduleId, title, subtitle, description, startTime, duration +FROM recordings r +ORDER BY startTime, title, subtitle diff --git a/p2pvr/daemon/sql/Recordings_getStorage.sql b/p2pvr/daemon/sql/Recordings_getStorage.sql new file mode 100644 index 0000000..fdaf58d --- /dev/null +++ b/p2pvr/daemon/sql/Recordings_getStorage.sql @@ -0,0 +1,4 @@ +SELECT storageAddress, guid +FROM recordings +WHERE recordingId = ? + diff --git a/p2pvr/daemon/sql/Recordings_insert.sql b/p2pvr/daemon/sql/Recordings_insert.sql new file mode 100644 index 0000000..dfe2ccc --- /dev/null +++ b/p2pvr/daemon/sql/Recordings_insert.sql @@ -0,0 +1,2 @@ +INSERT INTO recordings(storageAddress, guid, scheduleId, title, subtitle, description, startTime, duration) +VALUES(?, ?, ?, ?, ?, ?, ?, ?) diff --git a/p2pvr/daemon/sql/Recordings_insertNewId.sql b/p2pvr/daemon/sql/Recordings_insertNewId.sql new file mode 100644 index 0000000..0583b49 --- /dev/null +++ b/p2pvr/daemon/sql/Recordings_insertNewId.sql @@ -0,0 +1,2 @@ +SELECT currval('recordings_recordingid_seq'); + diff --git a/p2pvr/daemon/sql/SI_eventById.sql b/p2pvr/daemon/sql/SI_eventById.sql new file mode 100644 index 0000000..f0e028d --- /dev/null +++ b/p2pvr/daemon/sql/SI_eventById.sql @@ -0,0 +1,28 @@ +select e.serviceid, + e.eventid, + e.title, + e.titlelang, + e.subtitle, + e.description, + e.descriptionlang, + e.videoaspect, + e.videoframerate, + e.videohd, + e.audiochannels, + e.audiolanguage, + e.subtitlelanguage, + e.category, + e.subcategory, + e.usercategory, + e.dvbrating, + e.starttime, + e.stoptime, + e.episode, + e.episodes, + e.year, + e.flags, + e.season +from events e +where serviceid = ? +and eventid = ? + diff --git a/p2pvr/daemon/sql/SI_eventsInRange.sql b/p2pvr/daemon/sql/SI_eventsInRange.sql new file mode 100644 index 0000000..0a2d6de --- /dev/null +++ b/p2pvr/daemon/sql/SI_eventsInRange.sql @@ -0,0 +1,28 @@ +select e.serviceid, + e.eventid, + e.title, + e.titlelang, + e.subtitle, + e.description, + e.descriptionlang, + e.videoaspect, + e.videoframerate, + e.videohd, + e.audiochannels, + e.audiolanguage, + e.subtitlelanguage, + e.category, + e.subcategory, + e.usercategory, + e.dvbrating, + e.starttime, + e.stoptime, + e.episode, + e.episodes, + e.year, + e.flags, + e.season +from events e +where tsrange(?, ?, '[)') && tsrange(e.starttime, e.stoptime) +order by e.serviceid, e.starttime + diff --git a/p2pvr/daemon/sql/SI_eventsOnNow.sql b/p2pvr/daemon/sql/SI_eventsOnNow.sql new file mode 100644 index 0000000..36760b4 --- /dev/null +++ b/p2pvr/daemon/sql/SI_eventsOnNow.sql @@ -0,0 +1,27 @@ +select e.serviceid, + e.eventid, + e.title, + e.titlelang, + e.subtitle, + e.description, + e.descriptionlang, + e.videoaspect, + e.videoframerate, + e.videohd, + e.audiochannels, + e.audiolanguage, + e.subtitlelanguage, + e.category, + e.subcategory, + e.usercategory, + e.dvbrating, + e.starttime, + e.stoptime, + e.episode, + e.episodes, + e.year, + e.flags, + e.season +from events e +where now()::timestamp without time zone <@ tsrange(e.starttime, e.stoptime) +order by e.serviceid diff --git a/p2pvr/daemon/sql/SI_serviceNextUsed.sql b/p2pvr/daemon/sql/SI_serviceNextUsed.sql new file mode 100644 index 0000000..8e906ad --- /dev/null +++ b/p2pvr/daemon/sql/SI_serviceNextUsed.sql @@ -0,0 +1,12 @@ +SELECT dd.* +FROM delivery_dvbt dd, services s + LEFT OUTER JOIN record r + ON r.serviceid = s.serviceid + AND r.recordstatus = 0 + LEFT OUTER JOIN events e + ON r.eventid = e.eventid + AND r.serviceid = e.serviceid + AND e.starttime > NOW() +WHERE dd.transportstreamid = s.transportstreamid +ORDER BY e.starttime, s.serviceid +LIMIT 1 diff --git a/p2pvr/daemon/sql/SI_servicesSelectAll.sql b/p2pvr/daemon/sql/SI_servicesSelectAll.sql new file mode 100644 index 0000000..513a076 --- /dev/null +++ b/p2pvr/daemon/sql/SI_servicesSelectAll.sql @@ -0,0 +1,3 @@ +SELECT serviceId, transportStreamId, name, providerName, defaultAuthority, runningStatus, eitSchedule, eitPresentFollowing, freeCaMode +FROM services +ORDER BY serviceId diff --git a/p2pvr/daemon/sql/SI_servicesSelectById.sql b/p2pvr/daemon/sql/SI_servicesSelectById.sql new file mode 100644 index 0000000..f0bda72 --- /dev/null +++ b/p2pvr/daemon/sql/SI_servicesSelectById.sql @@ -0,0 +1,3 @@ +SELECT serviceId, transportStreamId, name, providerName, defaultAuthority, runningStatus, eitSchedule, eitPresentFollowing, freeCaMode +FROM services +WHERE serviceId = ? diff --git a/p2pvr/daemon/sql/Schedules_GetCandidates.sql b/p2pvr/daemon/sql/Schedules_GetCandidates.sql new file mode 100644 index 0000000..8e8b15e --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_GetCandidates.sql @@ -0,0 +1,25 @@ +select what, serviceid, eventid, transportstreamid, + starttime - early starttime, stoptime + late stoptime, + priority, scheduleid +from ( + select (e.title, e.subtitle, e.description)::text what, e.serviceid, e.eventid, sv.transportstreamid, + e.starttime, e.stoptime - interval '1 second' stoptime, + s.early, s.late, s.scheduleid, s.priority, + rank() over(partition by e.serviceid, e.eventid, sv.serviceid order by s.priority desc, s.scheduleid) schedulerank + from services sv, events e, schedules s + where (s.serviceid is null or s.serviceid = e.serviceid) + and (s.title is null or lower(s.title) = lower(e.title)) + and (s.eventid is null or s.eventid = e.eventid) + and (s.search is null or event_tsvector(e) @@ plainto_tsquery(s.search)) + and sv.serviceid = e.serviceid + and e.stoptime > now() + and not exists ( + select 1 + from recorded r + where lower(e.title) = lower(r.title) + and coalesce(lower(e.subtitle), '') = coalesce(lower(r.subtitle), '') + and ts_rank(to_tsvector(e.description), plainto_tsquery(r.description)) + + ts_rank(to_tsvector(r.description), plainto_tsquery(e.description)) > 1)) e +where e.schedulerank = 1 +order by e.priority desc, e.what, e.transportstreamid, e.starttime + diff --git a/p2pvr/daemon/sql/Schedules_delete.sql b/p2pvr/daemon/sql/Schedules_delete.sql new file mode 100644 index 0000000..e14edc3 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_delete.sql @@ -0,0 +1 @@ +DELETE FROM schedules WHERE scheduleId = ? diff --git a/p2pvr/daemon/sql/Schedules_insert.sql b/p2pvr/daemon/sql/Schedules_insert.sql new file mode 100644 index 0000000..100e78b --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_insert.sql @@ -0,0 +1,2 @@ +INSERT INTO schedules(serviceId, eventId, title, search, priority, early, late, repeats) +VALUES(?, ?, ?, ?, ?, ?, ?, ?) diff --git a/p2pvr/daemon/sql/Schedules_insertNewId.sql b/p2pvr/daemon/sql/Schedules_insertNewId.sql new file mode 100644 index 0000000..f66acd5 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_insertNewId.sql @@ -0,0 +1 @@ +SELECT currval('schedules_scheduleid_seq'); diff --git a/p2pvr/daemon/sql/Schedules_pendingRecord.sql b/p2pvr/daemon/sql/Schedules_pendingRecord.sql new file mode 100644 index 0000000..ba4d7d8 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_pendingRecord.sql @@ -0,0 +1,13 @@ +select * +from record r, events e, schedules s +where r.serviceid = e.serviceid +and r.eventid = e.eventid +and /*r.scheduleid*/ 161 = s.scheduleid +and r.recordstatus = 1 +and r.recordingstatus = 0 +and e.starttime - s.early - interval '3minutes' < now() +order by e.starttime - s.early + +; +select * +from schedules
\ No newline at end of file diff --git a/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql b/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql new file mode 100644 index 0000000..3874c37 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_scheduledToRecord.sql @@ -0,0 +1,8 @@ +SELECT r.serviceid, r.eventid, r.scheduleid +FROM record r, events e, schedules s +WHERE recordstatus = 0 +AND r.serviceid = e.serviceid +AND r.eventid = e.eventid +AND r.scheduleid = s.scheduleid +AND e.stoptime + s.late > NOW() +ORDER BY e.starttime, e.serviceid diff --git a/p2pvr/daemon/sql/Schedules_selectAll.sql b/p2pvr/daemon/sql/Schedules_selectAll.sql new file mode 100644 index 0000000..e9e500e --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_selectAll.sql @@ -0,0 +1,3 @@ +SELECT scheduleid, serviceid, eventid, title, search, priority, early, late, repeats +FROM schedules +ORDER BY scheduleId diff --git a/p2pvr/daemon/sql/Schedules_selectById.sql b/p2pvr/daemon/sql/Schedules_selectById.sql new file mode 100644 index 0000000..4990418 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_selectById.sql @@ -0,0 +1,5 @@ +SELECT scheduleid, serviceid, eventid, title, search, priority, early, late, repeats +FROM schedules +WHERE scheduleid = ? +ORDER BY scheduleId + diff --git a/p2pvr/daemon/sql/Schedules_update.sql b/p2pvr/daemon/sql/Schedules_update.sql new file mode 100644 index 0000000..56c9531 --- /dev/null +++ b/p2pvr/daemon/sql/Schedules_update.sql @@ -0,0 +1,10 @@ +UPDATE schedules SET + serviceId = ?, + eventId = ?, + title = ?, + search = ?, + priority = ?, + early = ?, + late = ?, + repeats = ? +WHERE scheduleId = ? diff --git a/p2pvr/daemon/sqlContainerCreator.h b/p2pvr/daemon/sqlContainerCreator.h new file mode 100644 index 0000000..5c8d8f8 --- /dev/null +++ b/p2pvr/daemon/sqlContainerCreator.h @@ -0,0 +1,28 @@ +#ifndef SQLCONTAINERCREATOR_H +#define SQLCONTAINERCREATOR_H + +#include "containerCreator.h" +#include <boost/shared_ptr.hpp> +#include <selectcommand.h> +#include <sqlHandleAsVariableType.h> + +template <typename T, typename V, typename P = IceInternal::Handle<V>> +class SqlContainerCreator : public ContainerCreator<T, V, P> { + public: + SqlContainerCreator(T & c) : ContainerCreator<T, V, P>(c) { } + + void populate(boost::shared_ptr<DB::SelectCommand> sel) + { + sel->execute(); + ContainerCreator<T, V, P>::populate(boost::bind(&DB::SelectCommand::fetch, sel), [sel](unsigned int c) { + HandleAsVariableType h; + const DB::Column & col = (*sel)[c]; + col.apply(h); + return h.variable; + }, sel->columnCount()); + } +}; + + +#endif + diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp new file mode 100644 index 0000000..9d9345d --- /dev/null +++ b/p2pvr/daemon/storage.cpp @@ -0,0 +1,275 @@ +#include <pch.hpp> +#include "storage.h" +#include "fileSink.h" +#include <commonHelpers.h> +#include <fcntl.h> +#include <logger.h> +#include <boost/filesystem/operations.hpp> +#include <boost/uuid/uuid_generators.hpp> +#include <boost/uuid/uuid_io.hpp> +#include <boost/lexical_cast.hpp> + +namespace fs = boost::filesystem; + +fs::path Storage::root; +fs::path Storage::byAll; +fs::path Storage::byTitle; +fs::path Storage::byDate; +fs::path Storage::byService; +fs::path Storage::bySchedule; + +DECLARE_OPTIONS(Storage, "P2PVR Storage options") +("p2pvr.storage.root", Options::value(&root, "recordings"), + "Root folder in which to store recordings") +("p2pvr.storage.all", Options::value(&byAll, "all"), + "Sub folder in which to store all recordings") +("p2pvr.storage.bytitle", Options::value(&byTitle, "title"), + "Sub folder in which to store by title") +("p2pvr.storage.bydate", Options::value(&byDate, "date"), + "Sub folder in which to store by date") +("p2pvr.storage.byservice", Options::value(&byService, "service"), + "Sub folder in which to store by service") +("p2pvr.storage.byschedule", Options::value(&bySchedule, "schedule"), + "Sub folder in which to store by schedule") +END_OPTIONS(Storage); + +inline static +fs::path createPath(const fs::path & start) +{ + return start; +} + +template <typename Arg> +inline static +fs::path +operator/(const fs::path & lhs, const IceUtil::Optional<Arg> & rhs) +{ + if (rhs) { + return lhs / *rhs; + } + return lhs; +} + +template <typename Arg, typename ... Args> +inline static +fs::path createPath(const fs::path & start, const Arg & arg, const Args & ... args) +{ + auto path = start / arg; + return createPath(path, args...); +} + +template <typename Arg, typename ... Args> +inline static +std::string firstNotNull(const Arg & arg, const Args & ...) +{ + return boost::lexical_cast<std::string>(arg); +} + +template <typename Arg, typename ... Args> +inline static +std::string firstNotNull(const IceUtil::Optional<Arg> & arg, const Args & ... args) +{ + return arg ? firstNotNull(*arg, args...) : firstNotNull(args...); +} + +template <typename ... Args> +inline static +std::string firstNotNull(const std::string & arg, const Args & ...) +{ + return arg; +} + +inline static +std::string +formatIf(const boost::format & f) +{ + return f.str(); +} + +template <typename Arg, typename ... Args> +inline static +IceUtil::Optional<std::string> +formatIf(boost::format & f, const IceUtil::Optional<Arg> & arg, const Args & ... args) +{ + if (arg) { + f % *arg; + return formatIf(f, args...); + } + else { + return IceUtil::Optional<std::string>(); + } +} + +template <typename Arg, typename ... Args> +inline static +IceUtil::Optional<std::string> +formatIf(boost::format & f, const Arg & arg, const Args & ... args) +{ + f % arg; + return formatIf(f, args...); +} + +template <typename... Args> +inline static +IceUtil::Optional<std::string> +formatIf(const std::string & msgfmt, const Args & ... args) +{ + boost::format fmt(msgfmt); + return formatIf(fmt, args...); +} + +std::string +Storage::CreateForEventRecording(const std::string & ext, const P2PVR::SchedulePtr & schedule, const DVBSI::ServicePtr & service, const DVBSI::EventPtr & event, const Ice::Current &) +{ + fs::create_directories(root / byAll); + auto id = boost::lexical_cast<std::string>(boost::uuids::random_generator()()); + fs::path path = root / byAll / id; + path.replace_extension(ext); + auto fd = open(path.string().c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664); + if (fd < 0) { + Logger()->messagebf(LOG_ERR, "%s: Failed to open file for writing at %s (%d:%s)", __PRETTY_FUNCTION__, + path, errno, strerror(errno)); + throw P2PVR::StorageException(path.string(), strerror(errno)); + } + createSymlinks(ext, id, schedule, service, event); + Logger()->messagebf(LOG_INFO, "%s: Created new recording %s", __PRETTY_FUNCTION__, path); + + close(fd); + return id; +} + +void +Storage::createSymlinks(const std::string & ext, const std::string & target, const P2PVR::SchedulePtr & schedule, const DVBSI::ServicePtr & service, const DVBSI::EventPtr & event) +{ + // By title, with optional season and episode information + createSymlink(ext, target, createPath(root, byTitle, + event->Title, + formatIf("Season %02d", event->Season), + firstNotNull( + formatIf("Part %02d of %02d - %s", event->Episode, event->Episodes, event->Subtitle), + formatIf("Episode %02d - %s", event->Episode, event->Subtitle), + formatIf("Part %02d of %02d - %s", event->Episode, event->Episodes, event->Description), + formatIf("Part %02d of %02d", event->Episode, event->Episodes), + formatIf("Episode %02d - %s", event->Episode, event->Description), + event->Subtitle, + event->Description, + formatIf("Episode %02d", event->Episode), + event->StartTime))); + // By date + createSymlink(ext, target, createPath(root, byDate, + formatIf("%04d-%02d-%02d", event->StartTime.Year, event->StartTime.Month, event->StartTime.Day), + firstNotNull( + formatIf("%s - %s", event->Title, event->Subtitle), + formatIf("%s - %s", event->Title, event->Description), + event->Title))); + // By service + createSymlink(ext, target, createPath(root, byService, + service->Name, + firstNotNull( + formatIf("%s - %s", event->Title, event->Subtitle), + formatIf("%s - %s", event->Title, event->Description), + event->Title))); + // By schedule title + createSymlink(ext, target, createPath(root, bySchedule, + formatIf("Title: %s", schedule->Title), + formatIf("%04d-%02d-%02d", event->StartTime.Year, event->StartTime.Month, event->StartTime.Day), + firstNotNull( + formatIf("%s - %s", event->Title, event->Subtitle), + formatIf("%s - %s", event->Title, event->Description), + event->Title))); + // By schedule search + createSymlink(ext, target, createPath(root, bySchedule, + formatIf("Search: %s", schedule->Search), + firstNotNull( + formatIf("%s - %s", event->Title, event->Subtitle), + formatIf("%s - %s", event->Title, event->Description), + event->Title))); +} +void +Storage::createSymlink(const std::string & ext, const std::string & target, const fs::path & link) +{ + fs::path path = link; + path.replace_extension(ext); + Logger()->messagebf(LOG_DEBUG, "%s: link(%s) -> target(%s)", __PRETTY_FUNCTION__, path, target); + if (fs::exists(path)) { + Logger()->messagebf(LOG_WARNING, "%s: symlink already exists %s", __PRETTY_FUNCTION__, path); + return; + } + fs::create_directories(path.parent_path()); + fs::path relativeTarget; + for (fs::path tmp = path.parent_path(); tmp != root; tmp = tmp.parent_path()) { + relativeTarget /= ".."; + } + relativeTarget /= byAll; + relativeTarget /= target; + relativeTarget.replace_extension(ext); + fs::create_symlink(relativeTarget, path); +} + +std::string +Storage::FindExtension(const std::string & id) +{ + fs::directory_iterator end; + for (fs::directory_iterator itr(root / byAll); itr != end; ++itr) { + if (itr->path().stem() == id) { + return itr->path().extension().string(); + } + } + return ""; +} + +P2PVR::RawDataClientPrx +Storage::OpenForWrite(const std::string & id, const Ice::Current & ice) +{ + fs::path path = root / byAll / id; + path.replace_extension(FindExtension(id)); + auto fd = open(path.string().c_str(), O_WRONLY | O_APPEND | O_LARGEFILE); + if (fd < 0) { + Logger()->messagebf(LOG_ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, + path, errno, strerror(errno)); + throw P2PVR::StorageException(path.string(), strerror(errno)); + } + auto openFile = OpenFilePtr(new OpenFile(ice.adapter, new FileSink(fd))); + openFiles.insert(openFile); + return *openFile; +} + +void +Storage::Close(const P2PVR::RawDataClientPrx & file, const Ice::Current &) +{ + openFiles.erase(std::find_if(openFiles.begin(), openFiles.end(), [&file](const OpenFilePtr & of) { return *of == file; })); +} + +void +Storage::Delete(const std::string & id, const Ice::Current &) +{ + fs::path path = root / byAll / id; + path.replace_extension(FindExtension(id)); + Logger()->messagebf(LOG_INFO, "%s: Deleting links to %s", __PRETTY_FUNCTION__, path); + DeleteFrom(path, fs::canonical(root)); +} + +void +Storage::DeleteFrom(const fs::path & path, const fs::path & from) +{ + Logger()->messagebf(LOG_DEBUG, "%s: Deleting links to %s to %s", __PRETTY_FUNCTION__, path, from); + fs::directory_iterator end; + for (fs::directory_iterator itr(from); itr != end; ++itr) { + if (fs::is_directory(*itr)) { + DeleteFrom(path, *itr); + } + else { + boost::system::error_code err; + auto link = fs::canonical(*itr, err); + if (err || link == path) { + Logger()->messagebf(LOG_DEBUG, "%s: deleting %s", __PRETTY_FUNCTION__, *itr); + fs::remove(*itr); + } + } + } + if (from != root && fs::is_empty(from)) { + Logger()->messagebf(LOG_DEBUG, "%s: deleting directory %s", __PRETTY_FUNCTION__, from); + fs::remove(from); + } +} + diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h new file mode 100644 index 0000000..093e00d --- /dev/null +++ b/p2pvr/daemon/storage.h @@ -0,0 +1,38 @@ +#ifndef STORAGE_H +#define STORAGE_H + +#include <p2pvr.h> +#include <options.h> +#include <string> +#include <boost/filesystem/path.hpp> +#include "temporaryIceAdapterObject.h" + +class Storage : public P2PVR::Storage { + public: + std::string CreateForEventRecording(const std::string & ext, const P2PVR::SchedulePtr &, const DVBSI::ServicePtr &, const DVBSI::EventPtr &, const Ice::Current &); + P2PVR::RawDataClientPrx OpenForWrite(const std::string &, const Ice::Current &); + void Close(const P2PVR::RawDataClientPrx & file, const Ice::Current &); + void Delete(const std::string &, const Ice::Current &); + + INITOPTIONS; + + protected: + static void createSymlinks(const std::string & ext, const std::string &, const P2PVR::SchedulePtr &, const DVBSI::ServicePtr &, const DVBSI::EventPtr &); + static void createSymlink(const std::string & ext, const std::string & target, const boost::filesystem::path & link); + static void DeleteFrom(const boost::filesystem::path &, const boost::filesystem::path &); + static std::string FindExtension(const std::string & id); + + typedef TemporarayIceAdapterObject<P2PVR::RawDataClient> OpenFile; + typedef boost::shared_ptr<OpenFile> OpenFilePtr; + typedef std::set<OpenFilePtr> OpenFiles; + OpenFiles openFiles; + + static boost::filesystem::path root; + static boost::filesystem::path byAll; + static boost::filesystem::path byTitle; + static boost::filesystem::path byDate; + static boost::filesystem::path byService; + static boost::filesystem::path bySchedule; +}; + +#endif |