From 800ab74ea27ca22aa62e953cbc012989c23c34ca Mon Sep 17 00:00:00 2001 From: randomdan Date: Thu, 9 Jan 2014 20:31:18 +0000 Subject: Basic implementation of a recorder --- p2pvr/daemon/daemon.cpp | 6 ++ p2pvr/ice/Jamfile.jam | 2 + p2pvr/ice/commonHelpers.cpp | 46 +++++++++++++ p2pvr/ice/commonHelpers.h | 3 + p2pvr/ice/p2pvr.ice | 15 ++++ p2pvr/lib/muxer.cpp | 79 +++++++++++++++------ p2pvr/lib/muxer.h | 7 +- p2pvr/lib/recorder.cpp | 99 +++++++++++++++++++++++++++ p2pvr/lib/recorder.h | 51 ++++++++++++++ p2pvr/lib/schedules.cpp | 42 ++++++++++++ p2pvr/lib/schedules.h | 2 + p2pvr/lib/serviceStreamer.cpp | 6 +- p2pvr/lib/serviceStreamer.h | 5 +- p2pvr/lib/si.cpp | 10 +++ p2pvr/lib/si.h | 1 + p2pvr/lib/siParsers/table.cpp | 7 ++ p2pvr/lib/siParsers/table.h | 4 ++ p2pvr/lib/sql/SI_eventById.sql | 28 ++++++++ p2pvr/lib/sql/Schedules_pendingRecord.sql | 13 ++++ p2pvr/lib/sql/Schedules_scheduledToRecord.sql | 8 +++ p2pvr/lib/sql/Schedules_selectById.sql | 5 ++ p2pvr/lib/temporaryIceAdapterObject.h | 9 ++- 22 files changed, 420 insertions(+), 28 deletions(-) create mode 100644 p2pvr/ice/commonHelpers.cpp create mode 100644 p2pvr/lib/recorder.cpp create mode 100644 p2pvr/lib/recorder.h create mode 100644 p2pvr/lib/sql/SI_eventById.sql create mode 100644 p2pvr/lib/sql/Schedules_pendingRecord.sql create mode 100644 p2pvr/lib/sql/Schedules_scheduledToRecord.sql create mode 100644 p2pvr/lib/sql/Schedules_selectById.sql diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index d4b59a0..a2739eb 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -4,6 +4,9 @@ #include "maintenance.h" #include "si.h" #include "schedules.h" +#include "storage.h" +#include "recorder.h" +#include "recordings.h" #include class P2PvrDaemon : public DaemonBase { @@ -20,6 +23,9 @@ class P2PvrDaemon : public DaemonBase { adapter->add(new Maintenance(adapter, timer), ic->stringToIdentity("Maintenance")); adapter->add(new SI(), ic->stringToIdentity("SI")); adapter->add(new Schedules(), ic->stringToIdentity("Schedules")); + adapter->add(new Storage(), ic->stringToIdentity("Storage")); + adapter->add(new Recorder(adapter, timer), ic->stringToIdentity("Recorder")); + adapter->add(new Recordings(), ic->stringToIdentity("Recordings")); } }; diff --git a/p2pvr/ice/Jamfile.jam b/p2pvr/ice/Jamfile.jam index 219c508..c5031bd 100644 --- a/p2pvr/ice/Jamfile.jam +++ b/p2pvr/ice/Jamfile.jam @@ -3,10 +3,12 @@ lib IceUtil ; lib pthread ; lib p2pvrice : + [ glob *.cpp ] [ glob *.ice ] : Ice IceUtil pthread + ..//p2lib : : . Ice diff --git a/p2pvr/ice/commonHelpers.cpp b/p2pvr/ice/commonHelpers.cpp new file mode 100644 index 0000000..64a1c61 --- /dev/null +++ b/p2pvr/ice/commonHelpers.cpp @@ -0,0 +1,46 @@ +#include "commonHelpers.h" +#include +#include + +namespace Common { + std::string operator-(const Common::DateTime & a, const Common::DateTime & b) + { + struct tm tma { + 0, a.Minute, a.Hour, + a.Day, a.Month - 1, a.Year - 1900, + 0, 0, 0, 0, 0}; + struct tm tmb { + 0, b.Minute, b.Hour, + b.Day, b.Month - 1, b.Year - 1900, + 0, 0, 0, 0, 0}; + auto secs = mktime(&tma) - mktime(&tmb); + return stringbf("%02d:%02d:%02d", secs / 3600, (secs / 60) % 60, secs % 60); + } + + time_t operator-(const Common::DateTime & cdt, const std::string & interval) + { + struct tm dt { + 0, cdt.Minute, cdt.Hour, + cdt.Day, cdt.Month - 1, cdt.Year - 1900, + 0, 0, 0, 0, 0}; + unsigned short hours, minutes, seconds; + if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) { + throw std::runtime_error("Couldn't parse interval"); + } + return mktime(&dt) - (seconds + (60 * (minutes + (60 * hours)))); + } + + time_t operator+(const Common::DateTime & cdt, const std::string & interval) + { + struct tm dt { + 0, cdt.Minute, cdt.Hour, + cdt.Day, cdt.Month - 1, cdt.Year - 1900, + 0, 0, 0, 0, 0}; + unsigned short hours, minutes, seconds; + if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) { + throw std::runtime_error("Couldn't parse interval"); + } + return mktime(&dt) + (seconds + (60 * (minutes + (60 * hours)))); + } +} + diff --git a/p2pvr/ice/commonHelpers.h b/p2pvr/ice/commonHelpers.h index bf93800..645ad1c 100644 --- a/p2pvr/ice/commonHelpers.h +++ b/p2pvr/ice/commonHelpers.h @@ -17,6 +17,9 @@ namespace Common { << ":" << std::setw(2) << std::setfill('0') << dt.Minute; return o; } + time_t operator-(const Common::DateTime & cdt, const std::string & interval); + time_t operator+(const Common::DateTime & cdt, const std::string & interval); + std::string operator-(const Common::DateTime & a, const Common::DateTime & b); } #endif diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index 9dba1f9..7b5f2fb 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -34,6 +34,14 @@ module P2PVR { }; sequence ScheduleList; + // Ids for something to record + class ScheduledToRecord { + int ScheduleId; + int ServiceId; + int EventId; + }; + sequence ScheduledToRecordList; + interface Maintenance { idempotent void UpdateAll(); idempotent void UpdateNetwork(short type); @@ -63,7 +71,9 @@ module P2PVR { interface Schedules { idempotent void DeleteSchedule(int scheduleId); + idempotent Schedule GetSchedule(int scheduleId); idempotent ScheduleList GetSchedules(); + idempotent ScheduledToRecordList GetScheduledToRecord(); idempotent int UpdateSchedule(Schedule newSchedule); idempotent void DoReschedule(); }; @@ -78,9 +88,14 @@ module P2PVR { idempotent DVBSI::ServiceList GetServices(); idempotent DVBSI::Service GetService(int id); // Get events + idempotent DVBSI::Event GetEvent(int serviceId, int eventId); idempotent DVBSI::Events EventsOnNow(); idempotent DVBSI::Events EventsInRange(Common::DateTime from, Common::DateTime to); }; + + interface Recorder { + idempotent void RefreshSchedules(); + }; }; #endif diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index ae2ee28..bd00a8c 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -1,31 +1,66 @@ #include #include "muxer.h" -#include #include #include +#include +#include -Muxer::Muxer(const P2PVR::RawDataClientPrx & t) : +pid_t +mpopenrw(const std::vector & params, int fds[2]) +{ + int rpipes[2], wpipes[2]; + if (pipe(rpipes)) { + throw std::runtime_error("Failed to create a pipe"); + } + if (pipe(wpipes)) { + throw std::runtime_error("Failed to create another pipe"); + } + pid_t child = fork(); + switch (child) { + case -1: // fail + throw std::runtime_error("Failed to fork"); + default: // parent + close(wpipes[0]); + close(rpipes[1]); + fds[0] = wpipes[1]; + fds[1] = rpipes[0]; + break; + case 0: // in child + close(wpipes[1]); + close(rpipes[0]); + dup2(wpipes[0], 0); + dup2(rpipes[1], 1); + for (int n = 3; n < 1024; n += 1) { + close(n); + } + char * buf[100]; + char ** w = &buf[0]; + BOOST_FOREACH(const auto & p, params) { + *w++ = strdup(p.c_str()); + } + *w = NULL; + execv(buf[0], buf); + abort(); + break; + } + return child; +} + +Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { - std::vector cmd; - cmd.push_back("/usr/bin/ffmpeg"); - cmd.push_back("-f"); - cmd.push_back("mpegts"); - cmd.push_back("-i"); - cmd.push_back("-"); - cmd.push_back("-f"); - cmd.push_back("mpeg"); - cmd.push_back("-codec"); - cmd.push_back("copy"); - cmd.push_back("-"); - cmd.push_back(NULL); - popenrw(&cmd.front(), fds); + std::vector params; + boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); + muxerPid = mpopenrw(params, fds); } Muxer::~Muxer() { close(fds[0]); - ReadMuxerAndSend(); + int status; + while (waitpid(muxerPid, &status, WNOHANG) == 0) { + ReadMuxerAndSend(5); + } close(fds[1]); } @@ -33,19 +68,19 @@ bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { std::lock_guard g(lock); - ReadMuxerAndSend(); + ReadMuxerAndSend(0); if (write(fds[0], &data.front(), data.size()) < 1) { return true; } - return ReadMuxerAndSend(); + return ReadMuxerAndSend(0); } bool -Muxer::ReadMuxerAndSend() const +Muxer::ReadMuxerAndSend(int waitTime) const { pollfd fd = { fds[1], POLLIN, 0 }; while (true) { - auto p = poll(&fd, 1, 0); + auto p = poll(&fd, 1, waitTime); if (p < 0) { // error return true; @@ -57,6 +92,10 @@ Muxer::ReadMuxerAndSend() const else if (p > 0) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[1], &buf.front(), buf.size()); + if (len == 0) { + // ok, proc exit + return true; + } if (len < 0) { // error return true; diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index 4b1187d..118b1da 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -6,15 +6,16 @@ class Muxer : public P2PVR::RawDataClient { public: - Muxer(const P2PVR::RawDataClientPrx & target); + Muxer(const P2PVR::RawDataClientPrx & target, const std::string & cmd); ~Muxer(); bool NewData(const P2PVR::Data &, const Ice::Current &); private: - bool ReadMuxerAndSend() const; - const P2PVR::RawDataClientPrx & target; + bool ReadMuxerAndSend(int wait) const; + const P2PVR::RawDataClientPrx target; int fds[2]; + pid_t muxerPid; std::mutex lock; }; diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp new file mode 100644 index 0000000..89902a9 --- /dev/null +++ b/p2pvr/lib/recorder.cpp @@ -0,0 +1,99 @@ +#include "recorder.h" +#include "bindTimerTask.h" +#include +#include +#include +#include +#include "serviceStreamer.h" +#include "storage.h" +#include "muxer.h" + +std::string Recorder::extension; +std::string Recorder::muxerCommand; + +DECLARE_OPTIONS(Recorder, "P2PVR Recorder options") +("p2pvr.recorder.extension", Options::value(&extension, "avi"), + "File extension to save with (default: avi)") +("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -"), + "File extension to save with (default: '/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -')") +END_OPTIONS(Recorder); + +Recorder::Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) : + adapter(a), + timer(t) +{ + Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + RefreshSchedules(Ice::Current()); +} + +void +Recorder::RefreshSchedules(const Ice::Current &) +{ + std::lock_guard g(lock); + BOOST_FOREACH(auto & t, pendingRecordings) { + timer->cancel(t); + } + pendingRecordings.clear(); + auto schedules = P2PVR::SchedulesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Schedules"))); + auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI"))); + BOOST_FOREACH(const auto & s, schedules->GetScheduledToRecord()) { + if (std::find_if(currentRecordings.begin(), currentRecordings.end(), [&s](const CurrentPtr & c) { + return s->ScheduleId == c->schedule->ScheduleId && s->ServiceId == c->event->ServiceId && s->EventId == c->event->EventId;; + }) != currentRecordings.end()) { + continue; + } + auto schedule = schedules->GetSchedule(s->ScheduleId); + auto service = si->GetService(s->ServiceId); + auto event = si->GetEvent(s->ServiceId, s->EventId); + + auto startIn = std::max(event->StartTime - schedule->Early - time(NULL), 0); + IceUtil::TimerTaskPtr startTimer = new BindTimerTask(boost::bind(&Recorder::StartRecording, this, schedule, service, event)); + timer->schedule(startTimer, IceUtil::Time::seconds(startIn)); + pendingRecordings.push_back(startTimer); + Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled for %s seconds", event->Title, startIn); + } +} + +void +Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event) +{ + std::lock_guard g(lock); + auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); + auto recordings = P2PVR::RecordingsPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Recordings"))); + + auto id = storage->CreateForEventRecording(extension, schedule, service, event); + auto store = storage->OpenForWrite(id); + ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); + auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); + ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); }); + auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter)); + + ss->Start(); + Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)", + event->Title, event->StartTime, event->StopTime, + service->Name ? *service->Name : "", service->ServiceId); + + recordings->NewRecording(new P2PVR::Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->Title, event->Subtitle, + event->Description, event->StartTime, event->StopTime - event->StartTime)); + auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()})); + currentRecordings.insert(newCurrent); + + auto stopIn = event->StopTime + schedule->Late - time(NULL); + newCurrent->stopTimer = new BindTimerTask(boost::bind(&Recorder::StopRecording, this, newCurrent)); + timer->schedule(newCurrent->stopTimer, IceUtil::Time::seconds(stopIn)); + Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled stop in %s seconds", event->Title, stopIn); +} + +void +Recorder::StopRecording(CurrentPtr c) +{ + std::lock_guard g(lock); + Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title); + c->stream->Stop(); + adapter->remove(c->muxer->ice_getIdentity()); + currentRecordings.erase(c); + auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); + storage->Close(c->store); + Logger()->messagebf(LOG_DEBUG, "Stopped %s", c->event->Title); +} + diff --git a/p2pvr/lib/recorder.h b/p2pvr/lib/recorder.h new file mode 100644 index 0000000..78c604b --- /dev/null +++ b/p2pvr/lib/recorder.h @@ -0,0 +1,51 @@ +#ifndef RECORDER_H +#define RECORDER_H + +#include +#include +#include +#include +#include +#include "serviceStreamer.h" + +class Recorder : public P2PVR::Recorder { + public: + typedef std::vector Pendings; + + class Current { + public: + P2PVR::RawDataClientPrx muxer; + P2PVR::RawDataClientPrx store; + ServiceStreamerPtr stream; + P2PVR::SchedulePtr schedule; + DVBSI::ServicePtr service; + DVBSI::EventPtr event; + IceUtil::TimerTaskPtr stopTimer; + }; + typedef boost::shared_ptr CurrentPtr; + typedef std::set Currents; + + Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr); + + void RefreshSchedules(const Ice::Current &); + + INITOPTIONS; + + private: + void StartRecordings(); + void StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event); + void StopRecording(CurrentPtr); + + Ice::ObjectAdapterPtr adapter; + IceUtil::TimerPtr timer; + + Currents currentRecordings; + Pendings pendingRecordings; + std::mutex lock; + + static std::string extension; + static std::string muxerCommand; +}; + +#endif + diff --git a/p2pvr/lib/schedules.cpp b/p2pvr/lib/schedules.cpp index 8fb96ac..bca7ddd 100644 --- a/p2pvr/lib/schedules.cpp +++ b/p2pvr/lib/schedules.cpp @@ -18,6 +18,8 @@ ResourceString(Schedules_insertNewId, lib_sql_Schedules_insertNewId_sql); ResourceString(Schedules_update, lib_sql_Schedules_update_sql); ResourceString(Schedules_delete, lib_sql_Schedules_delete_sql); ResourceString(Schedules_selectAll, lib_sql_Schedules_selectAll_sql); +ResourceString(Schedules_selectById, lib_sql_Schedules_selectById_sql); +ResourceString(Schedules_scheduledToRecord, lib_sql_Schedules_scheduledToRecord_sql); std::string Schedules::SchedulerAlgorithm; @@ -65,6 +67,24 @@ class Record { typedef boost::shared_ptr RecordPtr; typedef std::vector Records; +template<> +void +CreateColumns(const ColumnCreator & cc) +{ + cc("serviceid", true); + cc("eventid", true); + cc("scheduleid", true); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s) +{ + rs.fields[0] >> s->ServiceId; + rs.fields[1] >> s->EventId; + rs.fields[2] >> s->ScheduleId; +} + template<> void CreateColumns(const ColumnCreator & cc) @@ -403,6 +423,10 @@ Schedules::DoReschedule(const Ice::Current & ice) mergeRecords.sources.insert(new ContainerIterator(&records)); mergeRecords.loadComplete(this); mergeRecords.execute(NULL); + tx.Commit(); + + auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder"))); + recorder->RefreshSchedules(); } void @@ -422,6 +446,24 @@ Schedules::GetSchedules(const Ice::Current &) return schedules; } +P2PVR::SchedulePtr +Schedules::GetSchedule(int id, const Ice::Current &) +{ + P2PVR::ScheduleList schedules; + SqlContainerCreator cct(schedules); + cct.populate(Select(Schedules_selectById, id).second); + return schedules.front(); +} + +P2PVR::ScheduledToRecordList +Schedules::GetScheduledToRecord(const Ice::Current &) +{ + P2PVR::ScheduledToRecordList scheduled; + SqlContainerCreator cct(scheduled); + cct.populate(Select(Schedules_scheduledToRecord).second); + return scheduled; +} + int Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice) { diff --git a/p2pvr/lib/schedules.h b/p2pvr/lib/schedules.h index ff725e7..f7075dd 100644 --- a/p2pvr/lib/schedules.h +++ b/p2pvr/lib/schedules.h @@ -65,7 +65,9 @@ class EpisodeGroup { class Schedules : public P2PVR::Schedules, public DatabaseClient { public: void DeleteSchedule(int id, const Ice::Current &); + P2PVR::SchedulePtr GetSchedule(int id, const Ice::Current &); P2PVR::ScheduleList GetSchedules(const Ice::Current &); + P2PVR::ScheduledToRecordList GetScheduledToRecord(const Ice::Current &); int UpdateSchedule(const P2PVR::SchedulePtr &, const Ice::Current &); void DoReschedule(const Ice::Current &); diff --git a/p2pvr/lib/serviceStreamer.cpp b/p2pvr/lib/serviceStreamer.cpp index a26d157..eacf9c3 100644 --- a/p2pvr/lib/serviceStreamer.cpp +++ b/p2pvr/lib/serviceStreamer.cpp @@ -30,7 +30,8 @@ ServiceStreamer::HandlePAT(ProgramAssociationMapPtr pam) stopHandle(pmtHandle); pmtHandle = tuner->StartSendingSection(pmtStream, pmtParser); } - return true; + target->NewData(patParser.Get()->CurrentRawData()); + return false; } bool @@ -48,7 +49,8 @@ ServiceStreamer::HandlePMT(DVBSI::ProgramMapPtr pmp) stopHandle(serviceHandle); serviceHandle = tuner->StartSendingTS(P2PVR::PacketIds(streams.begin(), streams.end()), target); } - return true; + target->NewData(pmtParser.Get()->CurrentRawData()); + return false; } void diff --git a/p2pvr/lib/serviceStreamer.h b/p2pvr/lib/serviceStreamer.h index f140050..bdabe7a 100644 --- a/p2pvr/lib/serviceStreamer.h +++ b/p2pvr/lib/serviceStreamer.h @@ -26,8 +26,8 @@ class ServiceStreamer { P2PVR::SIPrx si; P2PVR::TunerPrx tuner; P2PVR::RawDataClientPrx target; - TemporarayIceAdapterObject patParser; - TemporarayIceAdapterObject pmtParser; + TemporarayIceAdapterObject patParser; + TemporarayIceAdapterObject pmtParser; int serviceId; int patHandle; @@ -37,6 +37,7 @@ class ServiceStreamer { Streams streams; int serviceHandle; }; +typedef boost::shared_ptr ServiceStreamerPtr; #endif diff --git a/p2pvr/lib/si.cpp b/p2pvr/lib/si.cpp index 3431b67..a61566b 100644 --- a/p2pvr/lib/si.cpp +++ b/p2pvr/lib/si.cpp @@ -8,6 +8,7 @@ ResourceString(SI_servicesSelectAll, lib_sql_SI_servicesSelectAll_sql); ResourceString(SI_servicesSelectById, lib_sql_SI_servicesSelectById_sql); +ResourceString(SI_eventById, lib_sql_SI_eventById_sql); ResourceString(SI_eventsOnNow, lib_sql_SI_eventsOnNow_sql); ResourceString(SI_eventsInRange, lib_sql_SI_eventsInRange_sql); @@ -85,6 +86,15 @@ SI::GetService(int id, const Ice::Current&) return rtn.front(); } +DVBSI::EventPtr +SI::GetEvent(int serviceId, int eventId, const Ice::Current &) +{ + DVBSI::Events rtn; + SqlContainerCreator cc(rtn); + cc.populate(Select(SI_eventById, serviceId, eventId).second); + return rtn.front(); +} + DVBSI::Events SI::EventsOnNow(const Ice::Current &) { diff --git a/p2pvr/lib/si.h b/p2pvr/lib/si.h index ca8e5b4..6e41d06 100644 --- a/p2pvr/lib/si.h +++ b/p2pvr/lib/si.h @@ -13,6 +13,7 @@ class SI : public P2PVR::SI, public DatabaseClient { DVBSI::ServiceList GetServices(const Ice::Current &); DVBSI::ServicePtr GetService(int id, const Ice::Current &); + DVBSI::EventPtr GetEvent(int serviceId, int eventId, const Ice::Current &); DVBSI::Events EventsOnNow(const Ice::Current &); DVBSI::Events EventsInRange(const Common::DateTime &, const Common::DateTime &, const Ice::Current &); }; diff --git a/p2pvr/lib/siParsers/table.cpp b/p2pvr/lib/siParsers/table.cpp index ea482b1..9844814 100644 --- a/p2pvr/lib/siParsers/table.cpp +++ b/p2pvr/lib/siParsers/table.cpp @@ -28,9 +28,16 @@ bool SiTableParserBase::NewData(const P2PVR::Data & bytes, const Ice::Current&) { //Logger()->messagebf(LOG_DEBUG, "%s: Got %d bytes", __PRETTY_FUNCTION__, bytes.size()); + currentRawData = &bytes; return ParseInfoTable(&bytes.front(), bytes.size()); } +const P2PVR::Data & +SiTableParserBase::CurrentRawData() const +{ + return *currentRawData; +} + SiTableParserBase::StrPtr SiTableParserBase::convert(const char * txt, size_t len) { diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h index 9de08d6..c470159 100644 --- a/p2pvr/lib/siParsers/table.h +++ b/p2pvr/lib/siParsers/table.h @@ -20,6 +20,8 @@ typedef unsigned char u_char; #define BcdCharToInt(x) (10*((x & 0xF0)>>4) + (x & 0xF)) class SiTableParserBase : public P2PVR::RawDataClient { + public: + const P2PVR::Data & CurrentRawData() const; protected: SiTableParserBase(); virtual ~SiTableParserBase() = 0; @@ -38,6 +40,8 @@ class SiTableParserBase : public P2PVR::RawDataClient { time_t startTime; unsigned int incomplete; std::mutex lock; + private: + const P2PVR::Data * currentRawData; }; struct SiTableHeaderBase { diff --git a/p2pvr/lib/sql/SI_eventById.sql b/p2pvr/lib/sql/SI_eventById.sql new file mode 100644 index 0000000..f0e028d --- /dev/null +++ b/p2pvr/lib/sql/SI_eventById.sql @@ -0,0 +1,28 @@ +select e.serviceid, + e.eventid, + e.title, + e.titlelang, + e.subtitle, + e.description, + e.descriptionlang, + e.videoaspect, + e.videoframerate, + e.videohd, + e.audiochannels, + e.audiolanguage, + e.subtitlelanguage, + e.category, + e.subcategory, + e.usercategory, + e.dvbrating, + e.starttime, + e.stoptime, + e.episode, + e.episodes, + e.year, + e.flags, + e.season +from events e +where serviceid = ? +and eventid = ? + diff --git a/p2pvr/lib/sql/Schedules_pendingRecord.sql b/p2pvr/lib/sql/Schedules_pendingRecord.sql new file mode 100644 index 0000000..ba4d7d8 --- /dev/null +++ b/p2pvr/lib/sql/Schedules_pendingRecord.sql @@ -0,0 +1,13 @@ +select * +from record r, events e, schedules s +where r.serviceid = e.serviceid +and r.eventid = e.eventid +and /*r.scheduleid*/ 161 = s.scheduleid +and r.recordstatus = 1 +and r.recordingstatus = 0 +and e.starttime - s.early - interval '3minutes' < now() +order by e.starttime - s.early + +; +select * +from schedules \ No newline at end of file diff --git a/p2pvr/lib/sql/Schedules_scheduledToRecord.sql b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql new file mode 100644 index 0000000..3874c37 --- /dev/null +++ b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql @@ -0,0 +1,8 @@ +SELECT r.serviceid, r.eventid, r.scheduleid +FROM record r, events e, schedules s +WHERE recordstatus = 0 +AND r.serviceid = e.serviceid +AND r.eventid = e.eventid +AND r.scheduleid = s.scheduleid +AND e.stoptime + s.late > NOW() +ORDER BY e.starttime, e.serviceid diff --git a/p2pvr/lib/sql/Schedules_selectById.sql b/p2pvr/lib/sql/Schedules_selectById.sql new file mode 100644 index 0000000..2eb7b0b --- /dev/null +++ b/p2pvr/lib/sql/Schedules_selectById.sql @@ -0,0 +1,5 @@ +SELECT scheduleid, serviceid, eventid, title, search, priority, early::text early, late::text late, repeats +FROM schedules +WHERE scheduleid = ? +ORDER BY scheduleId + diff --git a/p2pvr/lib/temporaryIceAdapterObject.h b/p2pvr/lib/temporaryIceAdapterObject.h index 0b6d65e..bc1dfcf 100644 --- a/p2pvr/lib/temporaryIceAdapterObject.h +++ b/p2pvr/lib/temporaryIceAdapterObject.h @@ -6,8 +6,9 @@ template class TemporarayIceAdapterObject { public: - TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * object) : + TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * o) : adapter(a), + object(o), proxy(Object::ProxyType::checkedCast(adapter->addWithUUID(object))) { if (!proxy) { @@ -27,6 +28,11 @@ class TemporarayIceAdapterObject { return proxy; } + Object * Get() const + { + return object; + } + typename Object::ProxyType operator->() const { return proxy; @@ -39,6 +45,7 @@ class TemporarayIceAdapterObject { private: Ice::ObjectAdapterPtr adapter; + Object * object; typename Object::ProxyType proxy; }; -- cgit v1.2.3