diff options
| author | randomdan <randomdan@localhost> | 2014-01-31 21:01:56 +0000 | 
|---|---|---|
| committer | randomdan <randomdan@localhost> | 2014-01-31 21:01:56 +0000 | 
| commit | d649bd72d98de0e834f1debfae38dfd98049aaa7 (patch) | |
| tree | a82008b333c81e4a91a11710c2c29da41c11f247 | |
| parent | Refactor tuner to use specific implementations of new interface for send diff... (diff) | |
| download | p2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.tar.bz2 p2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.tar.xz p2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.zip  | |
Fixes around the new muxer and sender features
| -rw-r--r-- | p2pvr/ice/dvb.ice | 4 | ||||
| -rw-r--r-- | p2pvr/lib/muxer.cpp | 25 | ||||
| -rw-r--r-- | p2pvr/lib/tuner.cpp | 32 | ||||
| -rw-r--r-- | p2pvr/lib/tuner.h | 3 | ||||
| -rw-r--r-- | p2pvr/lib/tunerSendTs.cpp | 14 | 
5 files changed, 48 insertions, 30 deletions
diff --git a/p2pvr/ice/dvb.ice b/p2pvr/ice/dvb.ice index 93dcd9b..00ba64b 100644 --- a/p2pvr/ice/dvb.ice +++ b/p2pvr/ice/dvb.ice @@ -10,13 +10,15 @@ module P2PVR {  		int Errno;  	}; +	exception DataHandlingException { }; +  	exception IncorrectDeliveryType { };  	sequence<byte> Data;  	sequence<short> PacketIds;  	interface RawDataClient { -		bool NewData(Data bytes); +		bool NewData(Data bytes) throws DataHandlingException;  	};  	interface Tuner { diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index 16bb410..5c1beaa 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -8,6 +8,8 @@  #include <boost/algorithm/string/split.hpp>  #include <boost/algorithm/string/classification.hpp> +class MuxerFailure : public P2PVR::DataHandlingException { }; +  Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :  	target(t)  { @@ -27,6 +29,7 @@ Muxer::~Muxer()  	}  	Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);  	close(fds[1]); +	close(fds[2]);  }  bool @@ -41,7 +44,7 @@ Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)  		auto w = write(fds[0], &data[off], data.size() - off);  		if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {  			Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); -			return true; +			throw MuxerFailure();  		}  		off += w;  	} @@ -57,7 +60,7 @@ Muxer::ReadWaiting() const  		auto p = poll(&fd, 1, 0);  		if (p < 0) {  			// error -			return true; +			throw MuxerFailure();  		}  		else if (p == 0) {  			// write would block @@ -81,42 +84,43 @@ Muxer::ReadAvailable() const  bool  Muxer::ReadMuxerAndSend(int waitTime) const  { -	pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } }; +	pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } };  	while (true) {  		auto p = poll(fd, 2, waitTime);  		if (p < 0) {  			// error -			return true; +			throw MuxerFailure();  		}  		else if (p == 0) {  			// all ok  			return false;  		}  		else { -			if (fd[0].revents & POLLIN) { +			bool closed = false; +			if (fd[0].revents & (POLLIN | POLLHUP)) {  				P2PVR::Data buf(BUFSIZ);  				auto len = read(fds[1], &buf.front(), buf.size());  				if (len == 0) {  					// ok, proc exit -					return true; +					closed = true;  				}  				if (len < 0) {  					// error -					return true; +					throw MuxerFailure();  				}  				buf.resize(len);  				target->NewData(buf);  			} -			if (fd[1].revents & POLLIN) { +			if (fd[1].revents & (POLLIN | POLLHUP)) {  				P2PVR::Data buf(BUFSIZ);  				auto len = read(fds[2], &buf.front(), buf.size());  				if (len == 0) {  					// ok, proc exit -					return true; +					closed = true;  				}  				if (len < 0) {  					// error -					return true; +					throw MuxerFailure();  				}  				buf.resize(len);  				std::vector<std::string> lines; @@ -126,6 +130,7 @@ Muxer::ReadMuxerAndSend(int waitTime) const  					Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);  				}  			} +			if (closed) return true;  		}  	}  } diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index 8ebf1a2..fe90231 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -48,9 +48,12 @@ Tuner::Tuner(const boost::filesystem::path & df) :  Tuner::~Tuner()  { -	while (!backgroundClients.empty()) { -		close(backgroundClients.begin()->first); -		backgroundClients.erase(backgroundClients.begin()); +	{ +		std::lock_guard<std::mutex> g(lock); +		while (!backgroundClients.empty()) { +			close(backgroundClients.begin()->first); +			backgroundClients.erase(backgroundClients.begin()); +		}  	}  	if (backgroundThread) {  		backgroundThread->join(); @@ -334,20 +337,25 @@ Tuner::senderThread()  			default:  				{ // stuff to do  					std::lock_guard<std::mutex> g(lock); -					BOOST_FOREACH(const auto & c, backgroundClients) { -						if (FD_ISSET(c.first, &rfds)) { +					for (auto c = backgroundClients.begin(); c != backgroundClients.end(); ) { +						if (FD_ISSET(c->first, &rfds)) {  							// Read it  							P2PVR::Data buf(1 << 16); -							int nr = read(c.first, &buf.front(), buf.size()); +							int nr = read(c->first, &buf.front(), buf.size());  							if (nr < 0) {  								Logger()->messagebf(LOG_DEBUG, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); -								close(c.first); -								backgroundClients.erase(c.first); -								break; // backgroundClients has changed, bailout and start again +								close(c->first); +								c = backgroundClients.erase(c);  							} -							size_t n = nr; -							buf.resize(n); -							c.second->NewData(buf); +							else { +								size_t n = nr; +								buf.resize(n); +								c->second->NewData(buf); +								c++; +							} +						} +						else { +							c++;  						}  					}  				} diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 57bdd82..670a17d 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -28,6 +28,7 @@ class Tuner : public P2PVR::PrivateTuner {  				const P2PVR::RawDataClientPrx client;  		};  		typedef boost::shared_ptr<IDataSender> BackgroundClient; +		typedef std::map<int, BackgroundClient> BackgroundClients;  		Tuner(const boost::filesystem::path & deviceFrontend);  		~Tuner(); @@ -63,8 +64,6 @@ class Tuner : public P2PVR::PrivateTuner {  		const boost::filesystem::path deviceFrontend;  		const boost::filesystem::path deviceRoot; -		typedef boost::function<bool(const P2PVR::Data &)> PacketCheckFunction; -		typedef std::map<int, BackgroundClient> BackgroundClients;  		BackgroundClients backgroundClients;  		std::thread * backgroundThread;  		std::mutex lock; diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp index d5ea1bb..70b6670 100644 --- a/p2pvr/lib/tunerSendTs.cpp +++ b/p2pvr/lib/tunerSendTs.cpp @@ -15,12 +15,16 @@ SendTs::SendTs(const P2PVR::RawDataClientPrx & c) :  SendTs::~SendTs()  { -	if (async) { -		client->end_NewData(async); +	try { +		if (async) { +			if (client->end_NewData(async)) return; +		} +		while (!buffer.empty()) { +			sendBufferChunk(); +			if (client->end_NewData(async)) return; +		}  	} -	while (!buffer.empty()) { -		sendBufferChunk(); -		client->end_NewData(async); +	catch (...) {  	}  }  | 
