summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-01-09 20:31:18 +0000
committerrandomdan <randomdan@localhost>2014-01-09 20:31:18 +0000
commit800ab74ea27ca22aa62e953cbc012989c23c34ca (patch)
tree67161266c9deb51cf8bb57d91401652c26f6ebc2
parentMakefile scripts for updating schema/database (diff)
downloadp2pvr-800ab74ea27ca22aa62e953cbc012989c23c34ca.tar.bz2
p2pvr-800ab74ea27ca22aa62e953cbc012989c23c34ca.tar.xz
p2pvr-800ab74ea27ca22aa62e953cbc012989c23c34ca.zip
Basic implementation of a recorder
-rw-r--r--p2pvr/daemon/daemon.cpp6
-rw-r--r--p2pvr/ice/Jamfile.jam2
-rw-r--r--p2pvr/ice/commonHelpers.cpp46
-rw-r--r--p2pvr/ice/commonHelpers.h3
-rw-r--r--p2pvr/ice/p2pvr.ice15
-rw-r--r--p2pvr/lib/muxer.cpp79
-rw-r--r--p2pvr/lib/muxer.h7
-rw-r--r--p2pvr/lib/recorder.cpp99
-rw-r--r--p2pvr/lib/recorder.h51
-rw-r--r--p2pvr/lib/schedules.cpp42
-rw-r--r--p2pvr/lib/schedules.h2
-rw-r--r--p2pvr/lib/serviceStreamer.cpp6
-rw-r--r--p2pvr/lib/serviceStreamer.h5
-rw-r--r--p2pvr/lib/si.cpp10
-rw-r--r--p2pvr/lib/si.h1
-rw-r--r--p2pvr/lib/siParsers/table.cpp7
-rw-r--r--p2pvr/lib/siParsers/table.h4
-rw-r--r--p2pvr/lib/sql/SI_eventById.sql28
-rw-r--r--p2pvr/lib/sql/Schedules_pendingRecord.sql13
-rw-r--r--p2pvr/lib/sql/Schedules_scheduledToRecord.sql8
-rw-r--r--p2pvr/lib/sql/Schedules_selectById.sql5
-rw-r--r--p2pvr/lib/temporaryIceAdapterObject.h9
22 files changed, 420 insertions, 28 deletions
diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp
index d4b59a0..a2739eb 100644
--- a/p2pvr/daemon/daemon.cpp
+++ b/p2pvr/daemon/daemon.cpp
@@ -4,6 +4,9 @@
#include "maintenance.h"
#include "si.h"
#include "schedules.h"
+#include "storage.h"
+#include "recorder.h"
+#include "recordings.h"
#include <Ice/ObjectAdapter.h>
class P2PvrDaemon : public DaemonBase {
@@ -20,6 +23,9 @@ class P2PvrDaemon : public DaemonBase {
adapter->add(new Maintenance(adapter, timer), ic->stringToIdentity("Maintenance"));
adapter->add(new SI(), ic->stringToIdentity("SI"));
adapter->add(new Schedules(), ic->stringToIdentity("Schedules"));
+ adapter->add(new Storage(), ic->stringToIdentity("Storage"));
+ adapter->add(new Recorder(adapter, timer), ic->stringToIdentity("Recorder"));
+ adapter->add(new Recordings(), ic->stringToIdentity("Recordings"));
}
};
diff --git a/p2pvr/ice/Jamfile.jam b/p2pvr/ice/Jamfile.jam
index 219c508..c5031bd 100644
--- a/p2pvr/ice/Jamfile.jam
+++ b/p2pvr/ice/Jamfile.jam
@@ -3,10 +3,12 @@ lib IceUtil ;
lib pthread ;
lib p2pvrice :
+ [ glob *.cpp ]
[ glob *.ice ] :
<library>Ice
<library>IceUtil
<library>pthread
+ <library>..//p2lib
: :
<include>.
<library>Ice
diff --git a/p2pvr/ice/commonHelpers.cpp b/p2pvr/ice/commonHelpers.cpp
new file mode 100644
index 0000000..64a1c61
--- /dev/null
+++ b/p2pvr/ice/commonHelpers.cpp
@@ -0,0 +1,46 @@
+#include "commonHelpers.h"
+#include <misc.h>
+#include <boost/format.hpp>
+
+namespace Common {
+ std::string operator-(const Common::DateTime & a, const Common::DateTime & b)
+ {
+ struct tm tma {
+ 0, a.Minute, a.Hour,
+ a.Day, a.Month - 1, a.Year - 1900,
+ 0, 0, 0, 0, 0};
+ struct tm tmb {
+ 0, b.Minute, b.Hour,
+ b.Day, b.Month - 1, b.Year - 1900,
+ 0, 0, 0, 0, 0};
+ auto secs = mktime(&tma) - mktime(&tmb);
+ return stringbf("%02d:%02d:%02d", secs / 3600, (secs / 60) % 60, secs % 60);
+ }
+
+ time_t operator-(const Common::DateTime & cdt, const std::string & interval)
+ {
+ struct tm dt {
+ 0, cdt.Minute, cdt.Hour,
+ cdt.Day, cdt.Month - 1, cdt.Year - 1900,
+ 0, 0, 0, 0, 0};
+ unsigned short hours, minutes, seconds;
+ if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) {
+ throw std::runtime_error("Couldn't parse interval");
+ }
+ return mktime(&dt) - (seconds + (60 * (minutes + (60 * hours))));
+ }
+
+ time_t operator+(const Common::DateTime & cdt, const std::string & interval)
+ {
+ struct tm dt {
+ 0, cdt.Minute, cdt.Hour,
+ cdt.Day, cdt.Month - 1, cdt.Year - 1900,
+ 0, 0, 0, 0, 0};
+ unsigned short hours, minutes, seconds;
+ if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) {
+ throw std::runtime_error("Couldn't parse interval");
+ }
+ return mktime(&dt) + (seconds + (60 * (minutes + (60 * hours))));
+ }
+}
+
diff --git a/p2pvr/ice/commonHelpers.h b/p2pvr/ice/commonHelpers.h
index bf93800..645ad1c 100644
--- a/p2pvr/ice/commonHelpers.h
+++ b/p2pvr/ice/commonHelpers.h
@@ -17,6 +17,9 @@ namespace Common {
<< ":" << std::setw(2) << std::setfill('0') << dt.Minute;
return o;
}
+ time_t operator-(const Common::DateTime & cdt, const std::string & interval);
+ time_t operator+(const Common::DateTime & cdt, const std::string & interval);
+ std::string operator-(const Common::DateTime & a, const Common::DateTime & b);
}
#endif
diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice
index 9dba1f9..7b5f2fb 100644
--- a/p2pvr/ice/p2pvr.ice
+++ b/p2pvr/ice/p2pvr.ice
@@ -34,6 +34,14 @@ module P2PVR {
};
sequence<Schedule> ScheduleList;
+ // Ids for something to record
+ class ScheduledToRecord {
+ int ScheduleId;
+ int ServiceId;
+ int EventId;
+ };
+ sequence<ScheduledToRecord> ScheduledToRecordList;
+
interface Maintenance {
idempotent void UpdateAll();
idempotent void UpdateNetwork(short type);
@@ -63,7 +71,9 @@ module P2PVR {
interface Schedules {
idempotent void DeleteSchedule(int scheduleId);
+ idempotent Schedule GetSchedule(int scheduleId);
idempotent ScheduleList GetSchedules();
+ idempotent ScheduledToRecordList GetScheduledToRecord();
idempotent int UpdateSchedule(Schedule newSchedule);
idempotent void DoReschedule();
};
@@ -78,9 +88,14 @@ module P2PVR {
idempotent DVBSI::ServiceList GetServices();
idempotent DVBSI::Service GetService(int id);
// Get events
+ idempotent DVBSI::Event GetEvent(int serviceId, int eventId);
idempotent DVBSI::Events EventsOnNow();
idempotent DVBSI::Events EventsInRange(Common::DateTime from, Common::DateTime to);
};
+
+ interface Recorder {
+ idempotent void RefreshSchedules();
+ };
};
#endif
diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp
index ae2ee28..bd00a8c 100644
--- a/p2pvr/lib/muxer.cpp
+++ b/p2pvr/lib/muxer.cpp
@@ -1,31 +1,66 @@
#include <pch.hpp>
#include "muxer.h"
-#include <misc.h>
#include <logger.h>
#include <poll.h>
+#include <sys/wait.h>
+#include <boost/algorithm/string/split.hpp>
-Muxer::Muxer(const P2PVR::RawDataClientPrx & t) :
+pid_t
+mpopenrw(const std::vector<std::string> & params, int fds[2])
+{
+ int rpipes[2], wpipes[2];
+ if (pipe(rpipes)) {
+ throw std::runtime_error("Failed to create a pipe");
+ }
+ if (pipe(wpipes)) {
+ throw std::runtime_error("Failed to create another pipe");
+ }
+ pid_t child = fork();
+ switch (child) {
+ case -1: // fail
+ throw std::runtime_error("Failed to fork");
+ default: // parent
+ close(wpipes[0]);
+ close(rpipes[1]);
+ fds[0] = wpipes[1];
+ fds[1] = rpipes[0];
+ break;
+ case 0: // in child
+ close(wpipes[1]);
+ close(rpipes[0]);
+ dup2(wpipes[0], 0);
+ dup2(rpipes[1], 1);
+ for (int n = 3; n < 1024; n += 1) {
+ close(n);
+ }
+ char * buf[100];
+ char ** w = &buf[0];
+ BOOST_FOREACH(const auto & p, params) {
+ *w++ = strdup(p.c_str());
+ }
+ *w = NULL;
+ execv(buf[0], buf);
+ abort();
+ break;
+ }
+ return child;
+}
+
+Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :
target(t)
{
- std::vector<const char *> cmd;
- cmd.push_back("/usr/bin/ffmpeg");
- cmd.push_back("-f");
- cmd.push_back("mpegts");
- cmd.push_back("-i");
- cmd.push_back("-");
- cmd.push_back("-f");
- cmd.push_back("mpeg");
- cmd.push_back("-codec");
- cmd.push_back("copy");
- cmd.push_back("-");
- cmd.push_back(NULL);
- popenrw(&cmd.front(), fds);
+ std::vector<std::string> params;
+ boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
+ muxerPid = mpopenrw(params, fds);
}
Muxer::~Muxer()
{
close(fds[0]);
- ReadMuxerAndSend();
+ int status;
+ while (waitpid(muxerPid, &status, WNOHANG) == 0) {
+ ReadMuxerAndSend(5);
+ }
close(fds[1]);
}
@@ -33,19 +68,19 @@ bool
Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)
{
std::lock_guard<std::mutex> g(lock);
- ReadMuxerAndSend();
+ ReadMuxerAndSend(0);
if (write(fds[0], &data.front(), data.size()) < 1) {
return true;
}
- return ReadMuxerAndSend();
+ return ReadMuxerAndSend(0);
}
bool
-Muxer::ReadMuxerAndSend() const
+Muxer::ReadMuxerAndSend(int waitTime) const
{
pollfd fd = { fds[1], POLLIN, 0 };
while (true) {
- auto p = poll(&fd, 1, 0);
+ auto p = poll(&fd, 1, waitTime);
if (p < 0) {
// error
return true;
@@ -57,6 +92,10 @@ Muxer::ReadMuxerAndSend() const
else if (p > 0) {
P2PVR::Data buf(BUFSIZ);
auto len = read(fds[1], &buf.front(), buf.size());
+ if (len == 0) {
+ // ok, proc exit
+ return true;
+ }
if (len < 0) {
// error
return true;
diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h
index 4b1187d..118b1da 100644
--- a/p2pvr/lib/muxer.h
+++ b/p2pvr/lib/muxer.h
@@ -6,15 +6,16 @@
class Muxer : public P2PVR::RawDataClient {
public:
- Muxer(const P2PVR::RawDataClientPrx & target);
+ Muxer(const P2PVR::RawDataClientPrx & target, const std::string & cmd);
~Muxer();
bool NewData(const P2PVR::Data &, const Ice::Current &);
private:
- bool ReadMuxerAndSend() const;
- const P2PVR::RawDataClientPrx & target;
+ bool ReadMuxerAndSend(int wait) const;
+ const P2PVR::RawDataClientPrx target;
int fds[2];
+ pid_t muxerPid;
std::mutex lock;
};
diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp
new file mode 100644
index 0000000..89902a9
--- /dev/null
+++ b/p2pvr/lib/recorder.cpp
@@ -0,0 +1,99 @@
+#include "recorder.h"
+#include "bindTimerTask.h"
+#include <boost/bind.hpp>
+#include <logger.h>
+#include <commonHelpers.h>
+#include <scopeObject.h>
+#include "serviceStreamer.h"
+#include "storage.h"
+#include "muxer.h"
+
+std::string Recorder::extension;
+std::string Recorder::muxerCommand;
+
+DECLARE_OPTIONS(Recorder, "P2PVR Recorder options")
+("p2pvr.recorder.extension", Options::value(&extension, "avi"),
+ "File extension to save with (default: avi)")
+("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -"),
+ "File extension to save with (default: '/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -')")
+END_OPTIONS(Recorder);
+
+Recorder::Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) :
+ adapter(a),
+ timer(t)
+{
+ Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__);
+ RefreshSchedules(Ice::Current());
+}
+
+void
+Recorder::RefreshSchedules(const Ice::Current &)
+{
+ std::lock_guard<std::mutex> g(lock);
+ BOOST_FOREACH(auto & t, pendingRecordings) {
+ timer->cancel(t);
+ }
+ pendingRecordings.clear();
+ auto schedules = P2PVR::SchedulesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Schedules")));
+ auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI")));
+ BOOST_FOREACH(const auto & s, schedules->GetScheduledToRecord()) {
+ if (std::find_if(currentRecordings.begin(), currentRecordings.end(), [&s](const CurrentPtr & c) {
+ return s->ScheduleId == c->schedule->ScheduleId && s->ServiceId == c->event->ServiceId && s->EventId == c->event->EventId;;
+ }) != currentRecordings.end()) {
+ continue;
+ }
+ auto schedule = schedules->GetSchedule(s->ScheduleId);
+ auto service = si->GetService(s->ServiceId);
+ auto event = si->GetEvent(s->ServiceId, s->EventId);
+
+ auto startIn = std::max<time_t>(event->StartTime - schedule->Early - time(NULL), 0);
+ IceUtil::TimerTaskPtr startTimer = new BindTimerTask(boost::bind(&Recorder::StartRecording, this, schedule, service, event));
+ timer->schedule(startTimer, IceUtil::Time::seconds(startIn));
+ pendingRecordings.push_back(startTimer);
+ Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled for %s seconds", event->Title, startIn);
+ }
+}
+
+void
+Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event)
+{
+ std::lock_guard<std::mutex> g(lock);
+ auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));
+ auto recordings = P2PVR::RecordingsPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Recordings")));
+
+ auto id = storage->CreateForEventRecording(extension, schedule, service, event);
+ auto store = storage->OpenForWrite(id);
+ ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); });
+ auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand)));
+ ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); });
+ auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter));
+
+ ss->Start();
+ Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)",
+ event->Title, event->StartTime, event->StopTime,
+ service->Name ? *service->Name : "<no name>", service->ServiceId);
+
+ recordings->NewRecording(new P2PVR::Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->Title, event->Subtitle,
+ event->Description, event->StartTime, event->StopTime - event->StartTime));
+ auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()}));
+ currentRecordings.insert(newCurrent);
+
+ auto stopIn = event->StopTime + schedule->Late - time(NULL);
+ newCurrent->stopTimer = new BindTimerTask(boost::bind(&Recorder::StopRecording, this, newCurrent));
+ timer->schedule(newCurrent->stopTimer, IceUtil::Time::seconds(stopIn));
+ Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled stop in %s seconds", event->Title, stopIn);
+}
+
+void
+Recorder::StopRecording(CurrentPtr c)
+{
+ std::lock_guard<std::mutex> g(lock);
+ Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title);
+ c->stream->Stop();
+ adapter->remove(c->muxer->ice_getIdentity());
+ currentRecordings.erase(c);
+ auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage")));
+ storage->Close(c->store);
+ Logger()->messagebf(LOG_DEBUG, "Stopped %s", c->event->Title);
+}
+
diff --git a/p2pvr/lib/recorder.h b/p2pvr/lib/recorder.h
new file mode 100644
index 0000000..78c604b
--- /dev/null
+++ b/p2pvr/lib/recorder.h
@@ -0,0 +1,51 @@
+#ifndef RECORDER_H
+#define RECORDER_H
+
+#include <IceUtil/Timer.h>
+#include <Ice/ObjectAdapter.h>
+#include <options.h>
+#include <p2pvr.h>
+#include <mutex>
+#include "serviceStreamer.h"
+
+class Recorder : public P2PVR::Recorder {
+ public:
+ typedef std::vector<IceUtil::TimerTaskPtr> Pendings;
+
+ class Current {
+ public:
+ P2PVR::RawDataClientPrx muxer;
+ P2PVR::RawDataClientPrx store;
+ ServiceStreamerPtr stream;
+ P2PVR::SchedulePtr schedule;
+ DVBSI::ServicePtr service;
+ DVBSI::EventPtr event;
+ IceUtil::TimerTaskPtr stopTimer;
+ };
+ typedef boost::shared_ptr<Current> CurrentPtr;
+ typedef std::set<CurrentPtr> Currents;
+
+ Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr);
+
+ void RefreshSchedules(const Ice::Current &);
+
+ INITOPTIONS;
+
+ private:
+ void StartRecordings();
+ void StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event);
+ void StopRecording(CurrentPtr);
+
+ Ice::ObjectAdapterPtr adapter;
+ IceUtil::TimerPtr timer;
+
+ Currents currentRecordings;
+ Pendings pendingRecordings;
+ std::mutex lock;
+
+ static std::string extension;
+ static std::string muxerCommand;
+};
+
+#endif
+
diff --git a/p2pvr/lib/schedules.cpp b/p2pvr/lib/schedules.cpp
index 8fb96ac..bca7ddd 100644
--- a/p2pvr/lib/schedules.cpp
+++ b/p2pvr/lib/schedules.cpp
@@ -18,6 +18,8 @@ ResourceString(Schedules_insertNewId, lib_sql_Schedules_insertNewId_sql);
ResourceString(Schedules_update, lib_sql_Schedules_update_sql);
ResourceString(Schedules_delete, lib_sql_Schedules_delete_sql);
ResourceString(Schedules_selectAll, lib_sql_Schedules_selectAll_sql);
+ResourceString(Schedules_selectById, lib_sql_Schedules_selectById_sql);
+ResourceString(Schedules_scheduledToRecord, lib_sql_Schedules_scheduledToRecord_sql);
std::string Schedules::SchedulerAlgorithm;
@@ -67,6 +69,24 @@ typedef std::vector<RecordPtr> Records;
template<>
void
+CreateColumns<P2PVR::ScheduledToRecordPtr>(const ColumnCreator & cc)
+{
+ cc("serviceid", true);
+ cc("eventid", true);
+ cc("scheduleid", true);
+}
+
+template<>
+void
+UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s)
+{
+ rs.fields[0] >> s->ServiceId;
+ rs.fields[1] >> s->EventId;
+ rs.fields[2] >> s->ScheduleId;
+}
+
+template<>
+void
CreateColumns<ScheduleCandidatePtr>(const ColumnCreator & cc)
{
cc("what", true);
@@ -403,6 +423,10 @@ Schedules::DoReschedule(const Ice::Current & ice)
mergeRecords.sources.insert(new ContainerIterator<Records>(&records));
mergeRecords.loadComplete(this);
mergeRecords.execute(NULL);
+ tx.Commit();
+
+ auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder")));
+ recorder->RefreshSchedules();
}
void
@@ -422,6 +446,24 @@ Schedules::GetSchedules(const Ice::Current &)
return schedules;
}
+P2PVR::SchedulePtr
+Schedules::GetSchedule(int id, const Ice::Current &)
+{
+ P2PVR::ScheduleList schedules;
+ SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules);
+ cct.populate(Select(Schedules_selectById, id).second);
+ return schedules.front();
+}
+
+P2PVR::ScheduledToRecordList
+Schedules::GetScheduledToRecord(const Ice::Current &)
+{
+ P2PVR::ScheduledToRecordList scheduled;
+ SqlContainerCreator<P2PVR::ScheduledToRecordList, P2PVR::ScheduledToRecord> cct(scheduled);
+ cct.populate(Select(Schedules_scheduledToRecord).second);
+ return scheduled;
+}
+
int
Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice)
{
diff --git a/p2pvr/lib/schedules.h b/p2pvr/lib/schedules.h
index ff725e7..f7075dd 100644
--- a/p2pvr/lib/schedules.h
+++ b/p2pvr/lib/schedules.h
@@ -65,7 +65,9 @@ class EpisodeGroup {
class Schedules : public P2PVR::Schedules, public DatabaseClient {
public:
void DeleteSchedule(int id, const Ice::Current &);
+ P2PVR::SchedulePtr GetSchedule(int id, const Ice::Current &);
P2PVR::ScheduleList GetSchedules(const Ice::Current &);
+ P2PVR::ScheduledToRecordList GetScheduledToRecord(const Ice::Current &);
int UpdateSchedule(const P2PVR::SchedulePtr &, const Ice::Current &);
void DoReschedule(const Ice::Current &);
diff --git a/p2pvr/lib/serviceStreamer.cpp b/p2pvr/lib/serviceStreamer.cpp
index a26d157..eacf9c3 100644
--- a/p2pvr/lib/serviceStreamer.cpp
+++ b/p2pvr/lib/serviceStreamer.cpp
@@ -30,7 +30,8 @@ ServiceStreamer::HandlePAT(ProgramAssociationMapPtr pam)
stopHandle(pmtHandle);
pmtHandle = tuner->StartSendingSection(pmtStream, pmtParser);
}
- return true;
+ target->NewData(patParser.Get()->CurrentRawData());
+ return false;
}
bool
@@ -48,7 +49,8 @@ ServiceStreamer::HandlePMT(DVBSI::ProgramMapPtr pmp)
stopHandle(serviceHandle);
serviceHandle = tuner->StartSendingTS(P2PVR::PacketIds(streams.begin(), streams.end()), target);
}
- return true;
+ target->NewData(pmtParser.Get()->CurrentRawData());
+ return false;
}
void
diff --git a/p2pvr/lib/serviceStreamer.h b/p2pvr/lib/serviceStreamer.h
index f140050..bdabe7a 100644
--- a/p2pvr/lib/serviceStreamer.h
+++ b/p2pvr/lib/serviceStreamer.h
@@ -26,8 +26,8 @@ class ServiceStreamer {
P2PVR::SIPrx si;
P2PVR::TunerPrx tuner;
P2PVR::RawDataClientPrx target;
- TemporarayIceAdapterObject<P2PVR::RawDataClient> patParser;
- TemporarayIceAdapterObject<P2PVR::RawDataClient> pmtParser;
+ TemporarayIceAdapterObject<SiTableParserBase> patParser;
+ TemporarayIceAdapterObject<SiTableParserBase> pmtParser;
int serviceId;
int patHandle;
@@ -37,6 +37,7 @@ class ServiceStreamer {
Streams streams;
int serviceHandle;
};
+typedef boost::shared_ptr<ServiceStreamer> ServiceStreamerPtr;
#endif
diff --git a/p2pvr/lib/si.cpp b/p2pvr/lib/si.cpp
index 3431b67..a61566b 100644
--- a/p2pvr/lib/si.cpp
+++ b/p2pvr/lib/si.cpp
@@ -8,6 +8,7 @@
ResourceString(SI_servicesSelectAll, lib_sql_SI_servicesSelectAll_sql);
ResourceString(SI_servicesSelectById, lib_sql_SI_servicesSelectById_sql);
+ResourceString(SI_eventById, lib_sql_SI_eventById_sql);
ResourceString(SI_eventsOnNow, lib_sql_SI_eventsOnNow_sql);
ResourceString(SI_eventsInRange, lib_sql_SI_eventsInRange_sql);
@@ -85,6 +86,15 @@ SI::GetService(int id, const Ice::Current&)
return rtn.front();
}
+DVBSI::EventPtr
+SI::GetEvent(int serviceId, int eventId, const Ice::Current &)
+{
+ DVBSI::Events rtn;
+ SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn);
+ cc.populate(Select(SI_eventById, serviceId, eventId).second);
+ return rtn.front();
+}
+
DVBSI::Events
SI::EventsOnNow(const Ice::Current &)
{
diff --git a/p2pvr/lib/si.h b/p2pvr/lib/si.h
index ca8e5b4..6e41d06 100644
--- a/p2pvr/lib/si.h
+++ b/p2pvr/lib/si.h
@@ -13,6 +13,7 @@ class SI : public P2PVR::SI, public DatabaseClient {
DVBSI::ServiceList GetServices(const Ice::Current &);
DVBSI::ServicePtr GetService(int id, const Ice::Current &);
+ DVBSI::EventPtr GetEvent(int serviceId, int eventId, const Ice::Current &);
DVBSI::Events EventsOnNow(const Ice::Current &);
DVBSI::Events EventsInRange(const Common::DateTime &, const Common::DateTime &, const Ice::Current &);
};
diff --git a/p2pvr/lib/siParsers/table.cpp b/p2pvr/lib/siParsers/table.cpp
index ea482b1..9844814 100644
--- a/p2pvr/lib/siParsers/table.cpp
+++ b/p2pvr/lib/siParsers/table.cpp
@@ -28,9 +28,16 @@ bool
SiTableParserBase::NewData(const P2PVR::Data & bytes, const Ice::Current&)
{
//Logger()->messagebf(LOG_DEBUG, "%s: Got %d bytes", __PRETTY_FUNCTION__, bytes.size());
+ currentRawData = &bytes;
return ParseInfoTable(&bytes.front(), bytes.size());
}
+const P2PVR::Data &
+SiTableParserBase::CurrentRawData() const
+{
+ return *currentRawData;
+}
+
SiTableParserBase::StrPtr
SiTableParserBase::convert(const char * txt, size_t len)
{
diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h
index 9de08d6..c470159 100644
--- a/p2pvr/lib/siParsers/table.h
+++ b/p2pvr/lib/siParsers/table.h
@@ -20,6 +20,8 @@ typedef unsigned char u_char;
#define BcdCharToInt(x) (10*((x & 0xF0)>>4) + (x & 0xF))
class SiTableParserBase : public P2PVR::RawDataClient {
+ public:
+ const P2PVR::Data & CurrentRawData() const;
protected:
SiTableParserBase();
virtual ~SiTableParserBase() = 0;
@@ -38,6 +40,8 @@ class SiTableParserBase : public P2PVR::RawDataClient {
time_t startTime;
unsigned int incomplete;
std::mutex lock;
+ private:
+ const P2PVR::Data * currentRawData;
};
struct SiTableHeaderBase {
diff --git a/p2pvr/lib/sql/SI_eventById.sql b/p2pvr/lib/sql/SI_eventById.sql
new file mode 100644
index 0000000..f0e028d
--- /dev/null
+++ b/p2pvr/lib/sql/SI_eventById.sql
@@ -0,0 +1,28 @@
+select e.serviceid,
+ e.eventid,
+ e.title,
+ e.titlelang,
+ e.subtitle,
+ e.description,
+ e.descriptionlang,
+ e.videoaspect,
+ e.videoframerate,
+ e.videohd,
+ e.audiochannels,
+ e.audiolanguage,
+ e.subtitlelanguage,
+ e.category,
+ e.subcategory,
+ e.usercategory,
+ e.dvbrating,
+ e.starttime,
+ e.stoptime,
+ e.episode,
+ e.episodes,
+ e.year,
+ e.flags,
+ e.season
+from events e
+where serviceid = ?
+and eventid = ?
+
diff --git a/p2pvr/lib/sql/Schedules_pendingRecord.sql b/p2pvr/lib/sql/Schedules_pendingRecord.sql
new file mode 100644
index 0000000..ba4d7d8
--- /dev/null
+++ b/p2pvr/lib/sql/Schedules_pendingRecord.sql
@@ -0,0 +1,13 @@
+select *
+from record r, events e, schedules s
+where r.serviceid = e.serviceid
+and r.eventid = e.eventid
+and /*r.scheduleid*/ 161 = s.scheduleid
+and r.recordstatus = 1
+and r.recordingstatus = 0
+and e.starttime - s.early - interval '3minutes' < now()
+order by e.starttime - s.early
+
+;
+select *
+from schedules \ No newline at end of file
diff --git a/p2pvr/lib/sql/Schedules_scheduledToRecord.sql b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql
new file mode 100644
index 0000000..3874c37
--- /dev/null
+++ b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql
@@ -0,0 +1,8 @@
+SELECT r.serviceid, r.eventid, r.scheduleid
+FROM record r, events e, schedules s
+WHERE recordstatus = 0
+AND r.serviceid = e.serviceid
+AND r.eventid = e.eventid
+AND r.scheduleid = s.scheduleid
+AND e.stoptime + s.late > NOW()
+ORDER BY e.starttime, e.serviceid
diff --git a/p2pvr/lib/sql/Schedules_selectById.sql b/p2pvr/lib/sql/Schedules_selectById.sql
new file mode 100644
index 0000000..2eb7b0b
--- /dev/null
+++ b/p2pvr/lib/sql/Schedules_selectById.sql
@@ -0,0 +1,5 @@
+SELECT scheduleid, serviceid, eventid, title, search, priority, early::text early, late::text late, repeats
+FROM schedules
+WHERE scheduleid = ?
+ORDER BY scheduleId
+
diff --git a/p2pvr/lib/temporaryIceAdapterObject.h b/p2pvr/lib/temporaryIceAdapterObject.h
index 0b6d65e..bc1dfcf 100644
--- a/p2pvr/lib/temporaryIceAdapterObject.h
+++ b/p2pvr/lib/temporaryIceAdapterObject.h
@@ -6,8 +6,9 @@
template <typename Object>
class TemporarayIceAdapterObject {
public:
- TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * object) :
+ TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * o) :
adapter(a),
+ object(o),
proxy(Object::ProxyType::checkedCast(adapter->addWithUUID(object)))
{
if (!proxy) {
@@ -27,6 +28,11 @@ class TemporarayIceAdapterObject {
return proxy;
}
+ Object * Get() const
+ {
+ return object;
+ }
+
typename Object::ProxyType operator->() const
{
return proxy;
@@ -39,6 +45,7 @@ class TemporarayIceAdapterObject {
private:
Ice::ObjectAdapterPtr adapter;
+ Object * object;
typename Object::ProxyType proxy;
};