diff options
| -rw-r--r-- | p2pvr/daemon/daemon.cpp | 6 | ||||
| -rw-r--r-- | p2pvr/ice/Jamfile.jam | 2 | ||||
| -rw-r--r-- | p2pvr/ice/commonHelpers.cpp | 46 | ||||
| -rw-r--r-- | p2pvr/ice/commonHelpers.h | 3 | ||||
| -rw-r--r-- | p2pvr/ice/p2pvr.ice | 15 | ||||
| -rw-r--r-- | p2pvr/lib/muxer.cpp | 79 | ||||
| -rw-r--r-- | p2pvr/lib/muxer.h | 7 | ||||
| -rw-r--r-- | p2pvr/lib/recorder.cpp | 99 | ||||
| -rw-r--r-- | p2pvr/lib/recorder.h | 51 | ||||
| -rw-r--r-- | p2pvr/lib/schedules.cpp | 42 | ||||
| -rw-r--r-- | p2pvr/lib/schedules.h | 2 | ||||
| -rw-r--r-- | p2pvr/lib/serviceStreamer.cpp | 6 | ||||
| -rw-r--r-- | p2pvr/lib/serviceStreamer.h | 5 | ||||
| -rw-r--r-- | p2pvr/lib/si.cpp | 10 | ||||
| -rw-r--r-- | p2pvr/lib/si.h | 1 | ||||
| -rw-r--r-- | p2pvr/lib/siParsers/table.cpp | 7 | ||||
| -rw-r--r-- | p2pvr/lib/siParsers/table.h | 4 | ||||
| -rw-r--r-- | p2pvr/lib/sql/SI_eventById.sql | 28 | ||||
| -rw-r--r-- | p2pvr/lib/sql/Schedules_pendingRecord.sql | 13 | ||||
| -rw-r--r-- | p2pvr/lib/sql/Schedules_scheduledToRecord.sql | 8 | ||||
| -rw-r--r-- | p2pvr/lib/sql/Schedules_selectById.sql | 5 | ||||
| -rw-r--r-- | p2pvr/lib/temporaryIceAdapterObject.h | 9 | 
22 files changed, 420 insertions, 28 deletions
| diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index d4b59a0..a2739eb 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -4,6 +4,9 @@  #include "maintenance.h"  #include "si.h"  #include "schedules.h" +#include "storage.h" +#include "recorder.h" +#include "recordings.h"  #include <Ice/ObjectAdapter.h>  class P2PvrDaemon : public DaemonBase { @@ -20,6 +23,9 @@ class P2PvrDaemon : public DaemonBase {  			adapter->add(new Maintenance(adapter, timer), ic->stringToIdentity("Maintenance"));  			adapter->add(new SI(), ic->stringToIdentity("SI"));  			adapter->add(new Schedules(), ic->stringToIdentity("Schedules")); +			adapter->add(new Storage(), ic->stringToIdentity("Storage")); +			adapter->add(new Recorder(adapter, timer), ic->stringToIdentity("Recorder")); +			adapter->add(new Recordings(), ic->stringToIdentity("Recordings"));  		}  }; diff --git a/p2pvr/ice/Jamfile.jam b/p2pvr/ice/Jamfile.jam index 219c508..c5031bd 100644 --- a/p2pvr/ice/Jamfile.jam +++ b/p2pvr/ice/Jamfile.jam @@ -3,10 +3,12 @@ lib IceUtil ;  lib pthread ;  lib p2pvrice : +	[ glob *.cpp ]  	[ glob *.ice ] :  	<library>Ice  	<library>IceUtil  	<library>pthread +	<library>..//p2lib  	: :  	<include>.  	<library>Ice diff --git a/p2pvr/ice/commonHelpers.cpp b/p2pvr/ice/commonHelpers.cpp new file mode 100644 index 0000000..64a1c61 --- /dev/null +++ b/p2pvr/ice/commonHelpers.cpp @@ -0,0 +1,46 @@ +#include "commonHelpers.h" +#include <misc.h> +#include <boost/format.hpp> + +namespace Common { +	std::string operator-(const Common::DateTime & a, const Common::DateTime & b) +	{ +		struct tm tma { +			0, a.Minute, a.Hour, +				a.Day, a.Month - 1, a.Year - 1900, +				0, 0, 0, 0, 0}; +		struct tm tmb { +			0, b.Minute, b.Hour, +				b.Day, b.Month - 1, b.Year - 1900, +				0, 0, 0, 0, 0}; +		auto secs = mktime(&tma) - mktime(&tmb); +		return stringbf("%02d:%02d:%02d", secs / 3600, (secs / 60) % 60, secs % 60); +	} + +	time_t operator-(const Common::DateTime & cdt, const std::string & interval) +	{ +		struct tm dt { +			0, cdt.Minute, cdt.Hour, +				cdt.Day, cdt.Month - 1, cdt.Year - 1900, +				0, 0, 0, 0, 0}; +		unsigned short hours, minutes, seconds; +		if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) { +			throw std::runtime_error("Couldn't parse interval"); +		} +		return mktime(&dt) - (seconds + (60 * (minutes + (60 * hours)))); +	} + +	time_t operator+(const Common::DateTime & cdt, const std::string & interval) +	{ +		struct tm dt { +			0, cdt.Minute, cdt.Hour, +				cdt.Day, cdt.Month - 1, cdt.Year - 1900, +				0, 0, 0, 0, 0}; +		unsigned short hours, minutes, seconds; +		if (sscanf(interval.c_str(), "%hu:%hu:%hu", &hours, &minutes, &seconds) < 3) { +			throw std::runtime_error("Couldn't parse interval"); +		} +		return mktime(&dt) + (seconds + (60 * (minutes + (60 * hours)))); +	} +} + diff --git a/p2pvr/ice/commonHelpers.h b/p2pvr/ice/commonHelpers.h index bf93800..645ad1c 100644 --- a/p2pvr/ice/commonHelpers.h +++ b/p2pvr/ice/commonHelpers.h @@ -17,6 +17,9 @@ namespace Common {  			<< ":" << std::setw(2) << std::setfill('0') << dt.Minute;  		return o;  	} +	time_t operator-(const Common::DateTime & cdt, const std::string & interval); +	time_t operator+(const Common::DateTime & cdt, const std::string & interval); +	std::string operator-(const Common::DateTime & a, const Common::DateTime & b);  }  #endif diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index 9dba1f9..7b5f2fb 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -34,6 +34,14 @@ module P2PVR {  	};  	sequence<Schedule> ScheduleList; +	// Ids for something to record +	class ScheduledToRecord { +		int ScheduleId; +		int ServiceId; +		int EventId; +	}; +	sequence<ScheduledToRecord> ScheduledToRecordList; +  	interface Maintenance {  		idempotent void UpdateAll();  		idempotent void UpdateNetwork(short type); @@ -63,7 +71,9 @@ module P2PVR {  	interface Schedules {  		idempotent void DeleteSchedule(int scheduleId); +		idempotent Schedule GetSchedule(int scheduleId);  		idempotent ScheduleList GetSchedules(); +		idempotent ScheduledToRecordList GetScheduledToRecord();  		idempotent int UpdateSchedule(Schedule newSchedule);  		idempotent void DoReschedule();  	}; @@ -78,9 +88,14 @@ module P2PVR {  		idempotent DVBSI::ServiceList GetServices();  		idempotent DVBSI::Service GetService(int id);  		// Get events +		idempotent DVBSI::Event GetEvent(int serviceId, int eventId);  		idempotent DVBSI::Events EventsOnNow();  		idempotent DVBSI::Events EventsInRange(Common::DateTime from, Common::DateTime to);  	}; + +	interface Recorder { +		idempotent void RefreshSchedules(); +	};  };  #endif diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index ae2ee28..bd00a8c 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -1,31 +1,66 @@  #include <pch.hpp>  #include "muxer.h" -#include <misc.h>  #include <logger.h>  #include <poll.h> +#include <sys/wait.h> +#include <boost/algorithm/string/split.hpp> -Muxer::Muxer(const P2PVR::RawDataClientPrx & t) : +pid_t +mpopenrw(const std::vector<std::string> & params, int fds[2]) +{ +	int rpipes[2], wpipes[2]; +	if (pipe(rpipes)) { +		throw std::runtime_error("Failed to create a pipe"); +	} +	if (pipe(wpipes)) { +		throw std::runtime_error("Failed to create another pipe"); +	} +	pid_t child = fork(); +	switch (child) { +		case -1: // fail +			throw std::runtime_error("Failed to fork"); +		default: // parent +			close(wpipes[0]); +			close(rpipes[1]); +			fds[0] = wpipes[1]; +			fds[1] = rpipes[0]; +			break; +		case 0: // in child +			close(wpipes[1]); +			close(rpipes[0]); +			dup2(wpipes[0], 0); +			dup2(rpipes[1], 1); +			for (int n = 3; n < 1024; n += 1) { +				close(n); +			} +			char * buf[100]; +			char ** w = &buf[0]; +			BOOST_FOREACH(const auto & p, params) { +				*w++ = strdup(p.c_str()); +			} +			*w = NULL; +			execv(buf[0], buf); +			abort(); +			break; +	} +	return child; +} + +Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :  	target(t)  { -	std::vector<const char *> cmd; -	cmd.push_back("/usr/bin/ffmpeg"); -	cmd.push_back("-f"); -	cmd.push_back("mpegts"); -	cmd.push_back("-i"); -	cmd.push_back("-"); -	cmd.push_back("-f"); -	cmd.push_back("mpeg"); -	cmd.push_back("-codec"); -	cmd.push_back("copy"); -	cmd.push_back("-"); -	cmd.push_back(NULL); -	popenrw(&cmd.front(), fds); +	std::vector<std::string> params; +	boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on); +	muxerPid = mpopenrw(params, fds);  }  Muxer::~Muxer()  {  	close(fds[0]); -	ReadMuxerAndSend(); +	int status; +	while (waitpid(muxerPid, &status, WNOHANG) == 0) { +		ReadMuxerAndSend(5); +	}  	close(fds[1]);  } @@ -33,19 +68,19 @@ bool  Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)  {  	std::lock_guard<std::mutex> g(lock); -	ReadMuxerAndSend(); +	ReadMuxerAndSend(0);  	if (write(fds[0], &data.front(), data.size()) < 1) {  		return true;  	} -	return ReadMuxerAndSend(); +	return ReadMuxerAndSend(0);  }  bool -Muxer::ReadMuxerAndSend() const +Muxer::ReadMuxerAndSend(int waitTime) const  {  	pollfd fd = { fds[1], POLLIN, 0 };  	while (true) { -		auto p = poll(&fd, 1, 0); +		auto p = poll(&fd, 1, waitTime);  		if (p < 0) {  			// error  			return true; @@ -57,6 +92,10 @@ Muxer::ReadMuxerAndSend() const  		else if (p > 0) {  			P2PVR::Data buf(BUFSIZ);  			auto len = read(fds[1], &buf.front(), buf.size()); +			if (len == 0) { +				// ok, proc exit +				return true; +			}  			if (len < 0) {  				// error  				return true; diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index 4b1187d..118b1da 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -6,15 +6,16 @@  class Muxer : public P2PVR::RawDataClient {  	public: -		Muxer(const P2PVR::RawDataClientPrx & target); +		Muxer(const P2PVR::RawDataClientPrx & target, const std::string & cmd);  		~Muxer();  		bool NewData(const P2PVR::Data &, const Ice::Current &);  	private: -		bool ReadMuxerAndSend() const; -		const P2PVR::RawDataClientPrx & target; +		bool ReadMuxerAndSend(int wait) const; +		const P2PVR::RawDataClientPrx target;  		int fds[2]; +		pid_t muxerPid;  		std::mutex lock;  }; diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp new file mode 100644 index 0000000..89902a9 --- /dev/null +++ b/p2pvr/lib/recorder.cpp @@ -0,0 +1,99 @@ +#include "recorder.h" +#include "bindTimerTask.h" +#include <boost/bind.hpp> +#include <logger.h> +#include <commonHelpers.h> +#include <scopeObject.h> +#include "serviceStreamer.h" +#include "storage.h" +#include "muxer.h" + +std::string Recorder::extension; +std::string Recorder::muxerCommand; + +DECLARE_OPTIONS(Recorder, "P2PVR Recorder options") +("p2pvr.recorder.extension", Options::value(&extension, "avi"), + "File extension to save with (default: avi)") +("p2pvr.recorder.muxercommand", Options::value(&muxerCommand, "/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -"), + "File extension to save with (default: '/usr/bin/ffmpeg -f mpegts -i - -f avi -codec copy -')") +END_OPTIONS(Recorder); + +Recorder::Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) : +	adapter(a), +	timer(t) +{ +	Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); +	RefreshSchedules(Ice::Current()); +} + +void +Recorder::RefreshSchedules(const Ice::Current &) +{ +	std::lock_guard<std::mutex> g(lock); +	BOOST_FOREACH(auto & t, pendingRecordings) { +		timer->cancel(t); +	} +	pendingRecordings.clear(); +	auto schedules = P2PVR::SchedulesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Schedules"))); +	auto si = P2PVR::SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI"))); +	BOOST_FOREACH(const auto & s, schedules->GetScheduledToRecord()) { +		if (std::find_if(currentRecordings.begin(), currentRecordings.end(), [&s](const CurrentPtr & c) { +					return s->ScheduleId == c->schedule->ScheduleId && s->ServiceId == c->event->ServiceId && s->EventId == c->event->EventId;; +				}) != currentRecordings.end()) { +			continue; +		} +		auto schedule = schedules->GetSchedule(s->ScheduleId); +		auto service = si->GetService(s->ServiceId); +		auto event = si->GetEvent(s->ServiceId, s->EventId); + +		auto startIn = std::max<time_t>(event->StartTime - schedule->Early - time(NULL), 0); +		IceUtil::TimerTaskPtr startTimer = new BindTimerTask(boost::bind(&Recorder::StartRecording, this, schedule, service, event)); +		timer->schedule(startTimer, IceUtil::Time::seconds(startIn)); +		pendingRecordings.push_back(startTimer); +		Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled for %s seconds", event->Title, startIn); +	} +} + +void +Recorder::StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event) +{ +	std::lock_guard<std::mutex> g(lock); +	auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); +	auto recordings = P2PVR::RecordingsPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Recordings"))); + +	auto id = storage->CreateForEventRecording(extension, schedule, service, event); +	auto store = storage->OpenForWrite(id); +	ScopeObject _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); +	auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, muxerCommand))); +	ScopeObject _muxer(NULL, NULL, [this,&muxer]() { adapter->remove(muxer->ice_getIdentity()); }); +	auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, muxer, adapter->getCommunicator(), adapter)); + +	ss->Start(); +	Logger()->messagebf(LOG_INFO, "Started recording %s (%s - %s) on %s (%d)", +			event->Title, event->StartTime, event->StopTime, +			service->Name ? *service->Name : "<no name>", service->ServiceId); + +	recordings->NewRecording(new P2PVR::Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->Title, event->Subtitle, +				event->Description, event->StartTime, event->StopTime - event->StartTime)); +	auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()})); +	currentRecordings.insert(newCurrent); + +	auto stopIn = event->StopTime + schedule->Late - time(NULL); +	newCurrent->stopTimer = new BindTimerTask(boost::bind(&Recorder::StopRecording, this, newCurrent)); +	timer->schedule(newCurrent->stopTimer, IceUtil::Time::seconds(stopIn)); +	Logger()->messagebf(LOG_DEBUG, "Recording %s scheduled stop in %s seconds", event->Title, stopIn); +} + +void +Recorder::StopRecording(CurrentPtr c) +{ +	std::lock_guard<std::mutex> g(lock); +	Logger()->messagebf(LOG_DEBUG, "Stopping %s", c->event->Title); +	c->stream->Stop(); +	adapter->remove(c->muxer->ice_getIdentity()); +	currentRecordings.erase(c); +	auto storage = P2PVR::StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); +	storage->Close(c->store); +	Logger()->messagebf(LOG_DEBUG, "Stopped %s", c->event->Title); +} + diff --git a/p2pvr/lib/recorder.h b/p2pvr/lib/recorder.h new file mode 100644 index 0000000..78c604b --- /dev/null +++ b/p2pvr/lib/recorder.h @@ -0,0 +1,51 @@ +#ifndef RECORDER_H +#define RECORDER_H + +#include <IceUtil/Timer.h> +#include <Ice/ObjectAdapter.h> +#include <options.h> +#include <p2pvr.h> +#include <mutex> +#include "serviceStreamer.h" + +class Recorder : public P2PVR::Recorder { +	public: +		typedef std::vector<IceUtil::TimerTaskPtr> Pendings; + +		class Current { +			public: +				P2PVR::RawDataClientPrx muxer; +				P2PVR::RawDataClientPrx store; +				ServiceStreamerPtr stream; +				P2PVR::SchedulePtr schedule; +				DVBSI::ServicePtr service; +				DVBSI::EventPtr event; +				IceUtil::TimerTaskPtr stopTimer; +		}; +		typedef boost::shared_ptr<Current> CurrentPtr; +		typedef std::set<CurrentPtr> Currents; + +		Recorder(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr); + +		void RefreshSchedules(const Ice::Current &); + +		INITOPTIONS; + +	private: +		void StartRecordings(); +		void StartRecording(P2PVR::SchedulePtr schedule, DVBSI::ServicePtr service, DVBSI::EventPtr event); +		void StopRecording(CurrentPtr); + +		Ice::ObjectAdapterPtr adapter; +		IceUtil::TimerPtr timer; + +		Currents currentRecordings; +		Pendings pendingRecordings; +		std::mutex lock; + +		static std::string extension; +		static std::string muxerCommand; +}; + +#endif + diff --git a/p2pvr/lib/schedules.cpp b/p2pvr/lib/schedules.cpp index 8fb96ac..bca7ddd 100644 --- a/p2pvr/lib/schedules.cpp +++ b/p2pvr/lib/schedules.cpp @@ -18,6 +18,8 @@ ResourceString(Schedules_insertNewId, lib_sql_Schedules_insertNewId_sql);  ResourceString(Schedules_update, lib_sql_Schedules_update_sql);  ResourceString(Schedules_delete, lib_sql_Schedules_delete_sql);  ResourceString(Schedules_selectAll, lib_sql_Schedules_selectAll_sql); +ResourceString(Schedules_selectById, lib_sql_Schedules_selectById_sql); +ResourceString(Schedules_scheduledToRecord, lib_sql_Schedules_scheduledToRecord_sql);  std::string Schedules::SchedulerAlgorithm; @@ -67,6 +69,24 @@ typedef std::vector<RecordPtr> Records;  template<>  void +CreateColumns<P2PVR::ScheduledToRecordPtr>(const ColumnCreator & cc) +{ +	cc("serviceid", true); +	cc("eventid", true); +	cc("scheduleid", true); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s) +{ +	rs.fields[0] >> s->ServiceId; +	rs.fields[1] >> s->EventId; +	rs.fields[2] >> s->ScheduleId; +} + +template<> +void  CreateColumns<ScheduleCandidatePtr>(const ColumnCreator & cc)  {  	cc("what", true); @@ -403,6 +423,10 @@ Schedules::DoReschedule(const Ice::Current & ice)  	mergeRecords.sources.insert(new ContainerIterator<Records>(&records));  	mergeRecords.loadComplete(this);  	mergeRecords.execute(NULL); +	tx.Commit(); + +	auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder"))); +	recorder->RefreshSchedules();  }  void @@ -422,6 +446,24 @@ Schedules::GetSchedules(const Ice::Current &)  	return schedules;  } +P2PVR::SchedulePtr +Schedules::GetSchedule(int id, const Ice::Current &) +{ +	P2PVR::ScheduleList schedules; +	SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules); +	cct.populate(Select(Schedules_selectById, id).second); +	return schedules.front(); +} + +P2PVR::ScheduledToRecordList +Schedules::GetScheduledToRecord(const Ice::Current &) +{ +	P2PVR::ScheduledToRecordList scheduled; +	SqlContainerCreator<P2PVR::ScheduledToRecordList, P2PVR::ScheduledToRecord> cct(scheduled); +	cct.populate(Select(Schedules_scheduledToRecord).second); +	return scheduled; +} +  int  Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice)  { diff --git a/p2pvr/lib/schedules.h b/p2pvr/lib/schedules.h index ff725e7..f7075dd 100644 --- a/p2pvr/lib/schedules.h +++ b/p2pvr/lib/schedules.h @@ -65,7 +65,9 @@ class EpisodeGroup {  class Schedules : public P2PVR::Schedules, public DatabaseClient {  	public:  		void DeleteSchedule(int id, const Ice::Current &); +		P2PVR::SchedulePtr GetSchedule(int id, const Ice::Current &);  		P2PVR::ScheduleList GetSchedules(const Ice::Current &); +		P2PVR::ScheduledToRecordList GetScheduledToRecord(const Ice::Current &);  		int UpdateSchedule(const P2PVR::SchedulePtr &, const Ice::Current &);  		void DoReschedule(const Ice::Current &); diff --git a/p2pvr/lib/serviceStreamer.cpp b/p2pvr/lib/serviceStreamer.cpp index a26d157..eacf9c3 100644 --- a/p2pvr/lib/serviceStreamer.cpp +++ b/p2pvr/lib/serviceStreamer.cpp @@ -30,7 +30,8 @@ ServiceStreamer::HandlePAT(ProgramAssociationMapPtr pam)  		stopHandle(pmtHandle);  		pmtHandle = tuner->StartSendingSection(pmtStream, pmtParser);  	} -	return true; +	target->NewData(patParser.Get()->CurrentRawData()); +	return false;  }  bool @@ -48,7 +49,8 @@ ServiceStreamer::HandlePMT(DVBSI::ProgramMapPtr pmp)  		stopHandle(serviceHandle);  		serviceHandle = tuner->StartSendingTS(P2PVR::PacketIds(streams.begin(), streams.end()), target);  	} -	return true; +	target->NewData(pmtParser.Get()->CurrentRawData()); +	return false;  }  void diff --git a/p2pvr/lib/serviceStreamer.h b/p2pvr/lib/serviceStreamer.h index f140050..bdabe7a 100644 --- a/p2pvr/lib/serviceStreamer.h +++ b/p2pvr/lib/serviceStreamer.h @@ -26,8 +26,8 @@ class ServiceStreamer {  		P2PVR::SIPrx si;  		P2PVR::TunerPrx tuner;  		P2PVR::RawDataClientPrx target; -		TemporarayIceAdapterObject<P2PVR::RawDataClient> patParser; -		TemporarayIceAdapterObject<P2PVR::RawDataClient> pmtParser; +		TemporarayIceAdapterObject<SiTableParserBase> patParser; +		TemporarayIceAdapterObject<SiTableParserBase> pmtParser;  		int serviceId;  		int patHandle; @@ -37,6 +37,7 @@ class ServiceStreamer {  		Streams streams;  		int serviceHandle;  }; +typedef boost::shared_ptr<ServiceStreamer> ServiceStreamerPtr;  #endif diff --git a/p2pvr/lib/si.cpp b/p2pvr/lib/si.cpp index 3431b67..a61566b 100644 --- a/p2pvr/lib/si.cpp +++ b/p2pvr/lib/si.cpp @@ -8,6 +8,7 @@  ResourceString(SI_servicesSelectAll, lib_sql_SI_servicesSelectAll_sql);  ResourceString(SI_servicesSelectById, lib_sql_SI_servicesSelectById_sql); +ResourceString(SI_eventById, lib_sql_SI_eventById_sql);  ResourceString(SI_eventsOnNow, lib_sql_SI_eventsOnNow_sql);  ResourceString(SI_eventsInRange, lib_sql_SI_eventsInRange_sql); @@ -85,6 +86,15 @@ SI::GetService(int id, const Ice::Current&)  	return rtn.front();  } +DVBSI::EventPtr +SI::GetEvent(int serviceId, int eventId, const Ice::Current &) +{ +	DVBSI::Events rtn; +	SqlContainerCreator<DVBSI::Events, DVBSI::Event> cc(rtn); +	cc.populate(Select(SI_eventById, serviceId, eventId).second); +	return rtn.front(); +} +  DVBSI::Events  SI::EventsOnNow(const Ice::Current &)  { diff --git a/p2pvr/lib/si.h b/p2pvr/lib/si.h index ca8e5b4..6e41d06 100644 --- a/p2pvr/lib/si.h +++ b/p2pvr/lib/si.h @@ -13,6 +13,7 @@ class SI : public P2PVR::SI, public DatabaseClient {  		DVBSI::ServiceList GetServices(const Ice::Current &);  		DVBSI::ServicePtr GetService(int id, const Ice::Current &); +		DVBSI::EventPtr GetEvent(int serviceId, int eventId, const Ice::Current &);  		DVBSI::Events EventsOnNow(const Ice::Current &);  		DVBSI::Events EventsInRange(const Common::DateTime &, const Common::DateTime &, const Ice::Current &);  }; diff --git a/p2pvr/lib/siParsers/table.cpp b/p2pvr/lib/siParsers/table.cpp index ea482b1..9844814 100644 --- a/p2pvr/lib/siParsers/table.cpp +++ b/p2pvr/lib/siParsers/table.cpp @@ -28,9 +28,16 @@ bool  SiTableParserBase::NewData(const P2PVR::Data & bytes, const Ice::Current&)  {  	//Logger()->messagebf(LOG_DEBUG, "%s: Got %d bytes", __PRETTY_FUNCTION__, bytes.size()); +	currentRawData = &bytes;  	return ParseInfoTable(&bytes.front(), bytes.size());  } +const P2PVR::Data & +SiTableParserBase::CurrentRawData() const +{ +	return *currentRawData; +} +  SiTableParserBase::StrPtr  SiTableParserBase::convert(const char * txt, size_t len)  { diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h index 9de08d6..c470159 100644 --- a/p2pvr/lib/siParsers/table.h +++ b/p2pvr/lib/siParsers/table.h @@ -20,6 +20,8 @@ typedef unsigned char u_char;  #define BcdCharToInt(x) (10*((x & 0xF0)>>4) + (x & 0xF))  class SiTableParserBase : public P2PVR::RawDataClient { +	public: +		const P2PVR::Data & CurrentRawData() const;  	protected:  		SiTableParserBase();  		virtual ~SiTableParserBase() = 0; @@ -38,6 +40,8 @@ class SiTableParserBase : public P2PVR::RawDataClient {  		time_t startTime;  		unsigned int incomplete;  		std::mutex lock; +	private: +		const P2PVR::Data * currentRawData;  };  struct SiTableHeaderBase { diff --git a/p2pvr/lib/sql/SI_eventById.sql b/p2pvr/lib/sql/SI_eventById.sql new file mode 100644 index 0000000..f0e028d --- /dev/null +++ b/p2pvr/lib/sql/SI_eventById.sql @@ -0,0 +1,28 @@ +select e.serviceid, +	e.eventid, +	e.title, +	e.titlelang, +	e.subtitle, +	e.description, +	e.descriptionlang, +	e.videoaspect, +	e.videoframerate, +	e.videohd, +	e.audiochannels, +	e.audiolanguage, +	e.subtitlelanguage, +	e.category, +	e.subcategory, +	e.usercategory, +	e.dvbrating, +	e.starttime, +	e.stoptime, +	e.episode, +	e.episodes, +	e.year, +	e.flags, +	e.season +from events e +where serviceid  = ? +and eventid = ? + diff --git a/p2pvr/lib/sql/Schedules_pendingRecord.sql b/p2pvr/lib/sql/Schedules_pendingRecord.sql new file mode 100644 index 0000000..ba4d7d8 --- /dev/null +++ b/p2pvr/lib/sql/Schedules_pendingRecord.sql @@ -0,0 +1,13 @@ +select * +from record r, events e, schedules s +where r.serviceid = e.serviceid +and r.eventid = e.eventid +and /*r.scheduleid*/ 161 = s.scheduleid +and r.recordstatus = 1 +and r.recordingstatus = 0 +and e.starttime - s.early - interval '3minutes' < now() +order by e.starttime - s.early + +; +select * +from schedules
\ No newline at end of file diff --git a/p2pvr/lib/sql/Schedules_scheduledToRecord.sql b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql new file mode 100644 index 0000000..3874c37 --- /dev/null +++ b/p2pvr/lib/sql/Schedules_scheduledToRecord.sql @@ -0,0 +1,8 @@ +SELECT r.serviceid, r.eventid, r.scheduleid +FROM record r, events e, schedules s +WHERE recordstatus = 0 +AND r.serviceid = e.serviceid +AND r.eventid = e.eventid +AND r.scheduleid = s.scheduleid +AND e.stoptime + s.late > NOW() +ORDER BY e.starttime, e.serviceid diff --git a/p2pvr/lib/sql/Schedules_selectById.sql b/p2pvr/lib/sql/Schedules_selectById.sql new file mode 100644 index 0000000..2eb7b0b --- /dev/null +++ b/p2pvr/lib/sql/Schedules_selectById.sql @@ -0,0 +1,5 @@ +SELECT scheduleid, serviceid, eventid, title, search, priority, early::text early, late::text late, repeats +FROM schedules +WHERE scheduleid = ? +ORDER BY scheduleId + diff --git a/p2pvr/lib/temporaryIceAdapterObject.h b/p2pvr/lib/temporaryIceAdapterObject.h index 0b6d65e..bc1dfcf 100644 --- a/p2pvr/lib/temporaryIceAdapterObject.h +++ b/p2pvr/lib/temporaryIceAdapterObject.h @@ -6,8 +6,9 @@  template <typename Object>  class TemporarayIceAdapterObject {  	public: -		TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * object) : +		TemporarayIceAdapterObject(Ice::ObjectAdapterPtr a, Object * o) :  			adapter(a), +			object(o),  			proxy(Object::ProxyType::checkedCast(adapter->addWithUUID(object)))  		{  			if (!proxy) { @@ -27,6 +28,11 @@ class TemporarayIceAdapterObject {  			return proxy;  		} +		Object * Get() const +		{ +			return object; +		} +  		typename Object::ProxyType operator->() const  		{  			return proxy; @@ -39,6 +45,7 @@ class TemporarayIceAdapterObject {  	private:  		Ice::ObjectAdapterPtr adapter; +		Object * object;  		typename Object::ProxyType proxy;  }; | 
