From d649bd72d98de0e834f1debfae38dfd98049aaa7 Mon Sep 17 00:00:00 2001 From: randomdan Date: Fri, 31 Jan 2014 21:01:56 +0000 Subject: Fixes around the new muxer and sender features --- p2pvr/ice/dvb.ice | 4 +++- p2pvr/lib/muxer.cpp | 25 +++++++++++++++---------- p2pvr/lib/tuner.cpp | 32 ++++++++++++++++++++------------ p2pvr/lib/tuner.h | 3 +-- p2pvr/lib/tunerSendTs.cpp | 14 +++++++++----- 5 files changed, 48 insertions(+), 30 deletions(-) diff --git a/p2pvr/ice/dvb.ice b/p2pvr/ice/dvb.ice index 93dcd9b..00ba64b 100644 --- a/p2pvr/ice/dvb.ice +++ b/p2pvr/ice/dvb.ice @@ -10,13 +10,15 @@ module P2PVR { int Errno; }; + exception DataHandlingException { }; + exception IncorrectDeliveryType { }; sequence Data; sequence PacketIds; interface RawDataClient { - bool NewData(Data bytes); + bool NewData(Data bytes) throws DataHandlingException; }; interface Tuner { diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index 16bb410..5c1beaa 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -8,6 +8,8 @@ #include #include +class MuxerFailure : public P2PVR::DataHandlingException { }; + Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) : target(t) { @@ -27,6 +29,7 @@ Muxer::~Muxer() } Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status); close(fds[1]); + close(fds[2]); } bool @@ -41,7 +44,7 @@ Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) auto w = write(fds[0], &data[off], data.size() - off); if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno)); - return true; + throw MuxerFailure(); } off += w; } @@ -57,7 +60,7 @@ Muxer::ReadWaiting() const auto p = poll(&fd, 1, 0); if (p < 0) { // error - return true; + throw MuxerFailure(); } else if (p == 0) { // write would block @@ -81,42 +84,43 @@ Muxer::ReadAvailable() const bool Muxer::ReadMuxerAndSend(int waitTime) const { - pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } }; + pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } }; while (true) { auto p = poll(fd, 2, waitTime); if (p < 0) { // error - return true; + throw MuxerFailure(); } else if (p == 0) { // all ok return false; } else { - if (fd[0].revents & POLLIN) { + bool closed = false; + if (fd[0].revents & (POLLIN | POLLHUP)) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[1], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit - return true; + closed = true; } if (len < 0) { // error - return true; + throw MuxerFailure(); } buf.resize(len); target->NewData(buf); } - if (fd[1].revents & POLLIN) { + if (fd[1].revents & (POLLIN | POLLHUP)) { P2PVR::Data buf(BUFSIZ); auto len = read(fds[2], &buf.front(), buf.size()); if (len == 0) { // ok, proc exit - return true; + closed = true; } if (len < 0) { // error - return true; + throw MuxerFailure(); } buf.resize(len); std::vector lines; @@ -126,6 +130,7 @@ Muxer::ReadMuxerAndSend(int waitTime) const Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line); } } + if (closed) return true; } } } diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index 8ebf1a2..fe90231 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -48,9 +48,12 @@ Tuner::Tuner(const boost::filesystem::path & df) : Tuner::~Tuner() { - while (!backgroundClients.empty()) { - close(backgroundClients.begin()->first); - backgroundClients.erase(backgroundClients.begin()); + { + std::lock_guard g(lock); + while (!backgroundClients.empty()) { + close(backgroundClients.begin()->first); + backgroundClients.erase(backgroundClients.begin()); + } } if (backgroundThread) { backgroundThread->join(); @@ -334,20 +337,25 @@ Tuner::senderThread() default: { // stuff to do std::lock_guard g(lock); - BOOST_FOREACH(const auto & c, backgroundClients) { - if (FD_ISSET(c.first, &rfds)) { + for (auto c = backgroundClients.begin(); c != backgroundClients.end(); ) { + if (FD_ISSET(c->first, &rfds)) { // Read it P2PVR::Data buf(1 << 16); - int nr = read(c.first, &buf.front(), buf.size()); + int nr = read(c->first, &buf.front(), buf.size()); if (nr < 0) { Logger()->messagebf(LOG_DEBUG, "%s: read failed (%d:%s)", __PRETTY_FUNCTION__, errno, strerror(errno)); - close(c.first); - backgroundClients.erase(c.first); - break; // backgroundClients has changed, bailout and start again + close(c->first); + c = backgroundClients.erase(c); } - size_t n = nr; - buf.resize(n); - c.second->NewData(buf); + else { + size_t n = nr; + buf.resize(n); + c->second->NewData(buf); + c++; + } + } + else { + c++; } } } diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h index 57bdd82..670a17d 100644 --- a/p2pvr/lib/tuner.h +++ b/p2pvr/lib/tuner.h @@ -28,6 +28,7 @@ class Tuner : public P2PVR::PrivateTuner { const P2PVR::RawDataClientPrx client; }; typedef boost::shared_ptr BackgroundClient; + typedef std::map BackgroundClients; Tuner(const boost::filesystem::path & deviceFrontend); ~Tuner(); @@ -63,8 +64,6 @@ class Tuner : public P2PVR::PrivateTuner { const boost::filesystem::path deviceFrontend; const boost::filesystem::path deviceRoot; - typedef boost::function PacketCheckFunction; - typedef std::map BackgroundClients; BackgroundClients backgroundClients; std::thread * backgroundThread; std::mutex lock; diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp index d5ea1bb..70b6670 100644 --- a/p2pvr/lib/tunerSendTs.cpp +++ b/p2pvr/lib/tunerSendTs.cpp @@ -15,12 +15,16 @@ SendTs::SendTs(const P2PVR::RawDataClientPrx & c) : SendTs::~SendTs() { - if (async) { - client->end_NewData(async); + try { + if (async) { + if (client->end_NewData(async)) return; + } + while (!buffer.empty()) { + sendBufferChunk(); + if (client->end_NewData(async)) return; + } } - while (!buffer.empty()) { - sendBufferChunk(); - client->end_NewData(async); + catch (...) { } } -- cgit v1.2.3