diff options
| -rw-r--r-- | p2pvr/ice/p2pvr.ice | 3 | ||||
| -rw-r--r-- | p2pvr/lib/tuner.cpp | 76 | ||||
| -rw-r--r-- | p2pvr/lib/tuner.h | 11 | 
3 files changed, 64 insertions, 26 deletions
| diff --git a/p2pvr/ice/p2pvr.ice b/p2pvr/ice/p2pvr.ice index 9dac2a4..7d84955 100644 --- a/p2pvr/ice/p2pvr.ice +++ b/p2pvr/ice/p2pvr.ice @@ -210,7 +210,8 @@ module P2PVR {  		idempotent void SendEventInformation(RawDataClient * client);  		int StartSendingTS(PacketIds pids, RawDataClient * client); -		idempotent void StopSendingTS(int handle); +		int StartSendingSection(int pid, RawDataClient * client); +		idempotent void StopSending(int handle);  	};  	interface PrivateTuner extends Tuner {  		idempotent void TuneTo(DVBSI::Delivery d); diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index b20cc75..25042ba 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -145,6 +145,13 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre  	ice.con->createProxy(client->ice_getIdentity());  	FileHandle demux(OpenDemux()); +	RequestPID(pid, demux); +	return ReadDemuxAndSend(demux, client); +} + +void +Tuner::RequestPID(int pid, int demux) +{  	struct dmx_sct_filter_params sctFilterParams;  	memset(&sctFilterParams, 0, sizeof(dmx_sct_filter_params));  	sctFilterParams.pid = pid; @@ -153,8 +160,6 @@ Tuner::SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Curre  	if (ioctl(demux, DMX_SET_FILTER, &sctFilterParams) < 0) {  		throw P2PVR::DeviceError("demux", strerror(errno), errno);  	} - -	return ReadDemuxAndSend(demux, client);  }  uint64_t @@ -184,22 +189,7 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const  		size_t n = nr;  		buf.resize(n); -		// Verify it -		if (n < sizeof(SiTableHeader)) { -			Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); -		} -		auto * tab = (const SiTableHeader *)(&buf.front()); -		size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); -		if (n < l) { -			Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); -			continue; -		} -		if (n > l) { -			Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); -			continue; -		} -		if (!crc32(buf)) { -			Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); +		if (!IsValidSection(buf)) {  			continue;  		} @@ -222,6 +212,45 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & client) const  	return packetsSent;  } +bool +Tuner::IsValidSection(const P2PVR::Data & buf) +{ +	auto n = buf.size(); +	if (n < sizeof(SiTableHeader)) { +		Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table."); +		return false; +	} +	auto * tab = (const SiTableHeader *)(&buf.front()); +	size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length); +	if (n < l) { +		Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length."); +		return false; +	} +	if (n > l) { +		Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length."); +		return false; +	} +	if (!crc32(buf)) { +		Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed)."); +		return false; +	} +	return true; +} + +int +Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) +{ +	time(&lastUsedTime); +	Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); + +	std::lock_guard<std::mutex> g(lock); +	int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), +				BackgroundClient(client, &Tuner::IsValidSection))).first->first; +	RequestPID(pid, demux); +	startSenderThread(); +	return demux; +} +  int  Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &)  { @@ -270,7 +299,7 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP  }  void -Tuner::StopSendingTS(int handle, const Ice::Current &) +Tuner::StopSending(int handle, const Ice::Current &)  {  	time(&lastUsedTime);  	Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); @@ -324,9 +353,10 @@ Tuner::senderThread()  							}  							size_t n = nr;  							buf.resize(n); -							// Send it -							asyncs.push_back(AsyncCall(c.second, c.second->begin_NewData(buf), c.first)); -							//c.second->NewData(buf); +							if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) { +								// Send it +								asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first)); +							}  						}  					}  				} @@ -337,7 +367,7 @@ Tuner::senderThread()  				time(&lastUsedTime);  				try {  					if (a.get<1>()->isCompleted()) { -						if (!a.get<0>()->end_NewData(a.get<1>())) { +						if (a.get<0>()->end_NewData(a.get<1>())) {  							close(a.get<2>());  							std::lock_guard<std::mutex> g(lock);  							backgroundClients.erase(a.get<2>()); diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 08701eb..e6cfa80 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -8,6 +8,8 @@  #include <thread>  #include <mutex>  #include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/tuple/tuple.hpp>  class Tuner : public P2PVR::PrivateTuner {  	public: @@ -27,7 +29,8 @@ class Tuner : public P2PVR::PrivateTuner {  		void SendEventInformation(const P2PVR::RawDataClientPrx & client, const Ice::Current&);  		int StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &); -		void StopSendingTS(int handle, const Ice::Current &); +		int StartSendingSection(Ice::Int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &); +		void StopSending(int handle, const Ice::Current &);  		Ice::Long GetLastUsedTime(const Ice::Current&); @@ -35,14 +38,18 @@ class Tuner : public P2PVR::PrivateTuner {  		static bool crc32(const P2PVR::Data &);  		int OpenDemux() const;  		uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const; +		static void RequestPID(int pid, int fd);  		uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const; +		static bool IsValidSection(const P2PVR::Data &);  		void startSenderThread();  		void senderThread();  		const boost::filesystem::path deviceFrontend;  		const boost::filesystem::path deviceRoot;  		const int timeout; -		typedef std::map<int, P2PVR::RawDataClientPrx> BackgroundClients; +		typedef boost::function<bool(const P2PVR::Data &)> PacketCheckFunction; +		typedef boost::tuple<P2PVR::RawDataClientPrx, PacketCheckFunction> BackgroundClient; +		typedef std::map<int, BackgroundClient> BackgroundClients;  		BackgroundClients backgroundClients;  		std::thread * backgroundThread;  		std::mutex lock; | 
