summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-01-31 21:01:56 +0000
committerrandomdan <randomdan@localhost>2014-01-31 21:01:56 +0000
commitd649bd72d98de0e834f1debfae38dfd98049aaa7 (patch)
treea82008b333c81e4a91a11710c2c29da41c11f247
parentRefactor tuner to use specific implementations of new interface for send diff... (diff)
downloadp2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.tar.bz2
p2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.tar.xz
p2pvr-d649bd72d98de0e834f1debfae38dfd98049aaa7.zip
Fixes around the new muxer and sender features
-rw-r--r--p2pvr/ice/dvb.ice4
-rw-r--r--p2pvr/lib/muxer.cpp25
-rw-r--r--p2pvr/lib/tuner.cpp32
-rw-r--r--p2pvr/lib/tuner.h3
-rw-r--r--p2pvr/lib/tunerSendTs.cpp14
5 files changed, 48 insertions, 30 deletions
diff --git a/p2pvr/ice/dvb.ice b/p2pvr/ice/dvb.ice
index 93dcd9b..00ba64b 100644
--- a/p2pvr/ice/dvb.ice
+++ b/p2pvr/ice/dvb.ice
@@ -10,13 +10,15 @@ module P2PVR {
int Errno;
};
+ exception DataHandlingException { };
+
exception IncorrectDeliveryType { };
sequence<byte> Data;
sequence<short> PacketIds;
interface RawDataClient {
- bool NewData(Data bytes);
+ bool NewData(Data bytes) throws DataHandlingException;
};
interface Tuner {
diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp
index 16bb410..5c1beaa 100644
--- a/p2pvr/lib/muxer.cpp
+++ b/p2pvr/lib/muxer.cpp
@@ -8,6 +8,8 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
+class MuxerFailure : public P2PVR::DataHandlingException { };
+
Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :
target(t)
{
@@ -27,6 +29,7 @@ Muxer::~Muxer()
}
Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);
close(fds[1]);
+ close(fds[2]);
}
bool
@@ -41,7 +44,7 @@ Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)
auto w = write(fds[0], &data[off], data.size() - off);
if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
- return true;
+ throw MuxerFailure();
}
off += w;
}
@@ -57,7 +60,7 @@ Muxer::ReadWaiting() const
auto p = poll(&fd, 1, 0);
if (p < 0) {
// error
- return true;
+ throw MuxerFailure();
}
else if (p == 0) {
// write would block
@@ -81,42 +84,43 @@ Muxer::ReadAvailable() const
bool
Muxer::ReadMuxerAndSend(int waitTime) const
{
- pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } };
+ pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } };
while (true) {
auto p = poll(fd, 2, waitTime);
if (p < 0) {
// error
- return true;
+ throw MuxerFailure();
}
else if (p == 0) {
// all ok
return false;
}
else {
- if (fd[0].revents & POLLIN) {
+ bool closed = false;
+ if (fd[0].revents & (POLLIN | POLLHUP)) {
P2PVR::Data buf(BUFSIZ);
auto len = read(fds[1], &buf.front(), buf.size());
if (len == 0) {
// ok, proc exit
- return true;
+ closed = true;
}
if (len < 0) {
// error
- return true;
+ throw MuxerFailure();
}
buf.resize(len);
target->NewData(buf);
}
- if (fd[1].revents & POLLIN) {
+ if (fd[1].revents & (POLLIN | POLLHUP)) {
P2PVR::Data buf(BUFSIZ);
auto len = read(fds[2], &buf.front(), buf.size());
if (len == 0) {
// ok, proc exit
- return true;
+ closed = true;
}
if (len < 0) {
// error
- return true;
+ throw MuxerFailure();
}
buf.resize(len);
std::vector<std::string> lines;
@@ -126,6 +130,7 @@ Muxer::ReadMuxerAndSend(int waitTime) const
Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);
}
}
+ if (closed) return true;
}
}
}
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp
index 8ebf1a2..fe90231 100644
--- a/p2pvr/lib/tuner.cpp
+++ b/p2pvr/lib/tuner.cpp
@@ -48,9 +48,12 @@ Tuner::Tuner(const boost::filesystem::path & df) :
Tuner::~Tuner()
{
- while (!backgroundClients.empty()) {
- close(backgroundClients.begin()->first);
- backgroundClients.erase(backgroundClients.begin());
+ {
+ std::lock_guard<std::mutex> g(lock);
+ while (!backgroundClients.empty()) {
+ close(backgroundClients.begin()->first);
+ backgroundClients.erase(backgroundClients.begin());
+ }
}
if (backgroundThread) {
backgroundThread->join();
@@ -334,20 +337,25 @@ Tuner::senderThread()
default:
{ // stuff to do
std::lock_guard<std::mutex> g(lock);
- BOOST_FOREACH(const auto & c, backgroundClients) {
- if (FD_ISSET(c.first, &rfds)) {
+ for (auto c = backgroundClients.begin(); c != backgroundClients.end(); ) {
+ if (FD_ISSET(c->first, &rfds)) {
// Read it
P2PVR::Data buf(1 << 16);
- int nr = read(c.first, &buf.front(), buf.size());
+ int nr = read(c->first, &buf.front(), buf.size());
if (nr < 0) {
Logger()->messagebf(LOG_DEBUG, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno));
- close(c.first);
- backgroundClients.erase(c.first);
- break; // backgroundClients has changed, bailout and start again
+ close(c->first);
+ c = backgroundClients.erase(c);
}
- size_t n = nr;
- buf.resize(n);
- c.second->NewData(buf);
+ else {
+ size_t n = nr;
+ buf.resize(n);
+ c->second->NewData(buf);
+ c++;
+ }
+ }
+ else {
+ c++;
}
}
}
diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h
index 57bdd82..670a17d 100644
--- a/p2pvr/lib/tuner.h
+++ b/p2pvr/lib/tuner.h
@@ -28,6 +28,7 @@ class Tuner : public P2PVR::PrivateTuner {
const P2PVR::RawDataClientPrx client;
};
typedef boost::shared_ptr<IDataSender> BackgroundClient;
+ typedef std::map<int, BackgroundClient> BackgroundClients;
Tuner(const boost::filesystem::path & deviceFrontend);
~Tuner();
@@ -63,8 +64,6 @@ class Tuner : public P2PVR::PrivateTuner {
const boost::filesystem::path deviceFrontend;
const boost::filesystem::path deviceRoot;
- typedef boost::function<bool(const P2PVR::Data &)> PacketCheckFunction;
- typedef std::map<int, BackgroundClient> BackgroundClients;
BackgroundClients backgroundClients;
std::thread * backgroundThread;
std::mutex lock;
diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp
index d5ea1bb..70b6670 100644
--- a/p2pvr/lib/tunerSendTs.cpp
+++ b/p2pvr/lib/tunerSendTs.cpp
@@ -15,12 +15,16 @@ SendTs::SendTs(const P2PVR::RawDataClientPrx & c) :
SendTs::~SendTs()
{
- if (async) {
- client->end_NewData(async);
+ try {
+ if (async) {
+ if (client->end_NewData(async)) return;
+ }
+ while (!buffer.empty()) {
+ sendBufferChunk();
+ if (client->end_NewData(async)) return;
+ }
}
- while (!buffer.empty()) {
- sendBufferChunk();
- client->end_NewData(async);
+ catch (...) {
}
}