summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--p2pvr/lib/muxer.cpp134
-rw-r--r--p2pvr/lib/muxer.h7
2 files changed, 81 insertions, 60 deletions
diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp
index bd00a8c..25f6b57 100644
--- a/p2pvr/lib/muxer.cpp
+++ b/p2pvr/lib/muxer.cpp
@@ -1,57 +1,19 @@
#include <pch.hpp>
#include "muxer.h"
#include <logger.h>
+#include <misc.h>
#include <poll.h>
#include <sys/wait.h>
#include <boost/algorithm/string/split.hpp>
-
-pid_t
-mpopenrw(const std::vector<std::string> & params, int fds[2])
-{
- int rpipes[2], wpipes[2];
- if (pipe(rpipes)) {
- throw std::runtime_error("Failed to create a pipe");
- }
- if (pipe(wpipes)) {
- throw std::runtime_error("Failed to create another pipe");
- }
- pid_t child = fork();
- switch (child) {
- case -1: // fail
- throw std::runtime_error("Failed to fork");
- default: // parent
- close(wpipes[0]);
- close(rpipes[1]);
- fds[0] = wpipes[1];
- fds[1] = rpipes[0];
- break;
- case 0: // in child
- close(wpipes[1]);
- close(rpipes[0]);
- dup2(wpipes[0], 0);
- dup2(rpipes[1], 1);
- for (int n = 3; n < 1024; n += 1) {
- close(n);
- }
- char * buf[100];
- char ** w = &buf[0];
- BOOST_FOREACH(const auto & p, params) {
- *w++ = strdup(p.c_str());
- }
- *w = NULL;
- execv(buf[0], buf);
- abort();
- break;
- }
- return child;
-}
+#include <boost/algorithm/string/classification.hpp>
Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :
target(t)
{
std::vector<std::string> params;
boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
- muxerPid = mpopenrw(params, fds);
+ muxerPid = popenrwe(params, fds);
+ Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd);
}
Muxer::~Muxer()
@@ -61,26 +23,61 @@ Muxer::~Muxer()
while (waitpid(muxerPid, &status, WNOHANG) == 0) {
ReadMuxerAndSend(5);
}
+ Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);
close(fds[1]);
}
bool
Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)
{
- std::lock_guard<std::mutex> g(lock);
- ReadMuxerAndSend(0);
- if (write(fds[0], &data.front(), data.size()) < 1) {
+ if (ReadWaiting())
return true;
+ {
+ std::lock_guard<std::mutex> g(wlock);
+ if (write(fds[0], &data.front(), data.size()) < (int)data.size()) {
+ return true;
+ }
+ }
+ return ReadAvailable();
+}
+
+bool
+Muxer::ReadWaiting() const
+{
+ std::lock_guard<std::mutex> g(rlock);
+ pollfd fd = { fds[0], POLLOUT, 0 };
+ while (true) {
+ auto p = poll(&fd, 1, 0);
+ if (p < 0) {
+ // error
+ return true;
+ }
+ else if (p == 0) {
+ // write would block
+ if (ReadMuxerAndSend(1)) return true;
+ }
+ else {
+ // write would not block
+ if (ReadMuxerAndSend(0)) return true;
+ break;
+ }
}
+ return false;
+}
+
+bool
+Muxer::ReadAvailable() const
+{
+ std::lock_guard<std::mutex> g(rlock);
return ReadMuxerAndSend(0);
}
bool
Muxer::ReadMuxerAndSend(int waitTime) const
{
- pollfd fd = { fds[1], POLLIN, 0 };
+ pollfd fd[2] = { { fds[1], POLLIN, 0 }, { fds[2], POLLIN, 0 } };
while (true) {
- auto p = poll(&fd, 1, waitTime);
+ auto p = poll(fd, 2, waitTime);
if (p < 0) {
// error
return true;
@@ -89,19 +86,40 @@ Muxer::ReadMuxerAndSend(int waitTime) const
// all ok
return false;
}
- else if (p > 0) {
- P2PVR::Data buf(BUFSIZ);
- auto len = read(fds[1], &buf.front(), buf.size());
- if (len == 0) {
- // ok, proc exit
- return true;
+ else {
+ if (fd[0].revents & POLLIN) {
+ P2PVR::Data buf(BUFSIZ);
+ auto len = read(fds[1], &buf.front(), buf.size());
+ if (len == 0) {
+ // ok, proc exit
+ return true;
+ }
+ if (len < 0) {
+ // error
+ return true;
+ }
+ buf.resize(len);
+ target->NewData(buf);
}
- if (len < 0) {
- // error
- return true;
+ if (fd[1].revents & POLLIN) {
+ P2PVR::Data buf(BUFSIZ);
+ auto len = read(fds[2], &buf.front(), buf.size());
+ if (len == 0) {
+ // ok, proc exit
+ return true;
+ }
+ if (len < 0) {
+ // error
+ return true;
+ }
+ buf.resize(len);
+ std::vector<std::string> lines;
+ boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
+ BOOST_FOREACH(const auto & line, lines) {
+ if (line.empty()) continue;
+ Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);
+ }
}
- buf.resize(len);
- target->NewData(buf);
}
}
}
diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h
index 118b1da..e4789cf 100644
--- a/p2pvr/lib/muxer.h
+++ b/p2pvr/lib/muxer.h
@@ -12,11 +12,14 @@ class Muxer : public P2PVR::RawDataClient {
bool NewData(const P2PVR::Data &, const Ice::Current &);
private:
+ bool ReadWaiting() const;
+ bool ReadAvailable() const;
bool ReadMuxerAndSend(int wait) const;
const P2PVR::RawDataClientPrx target;
- int fds[2];
+ int fds[3];
pid_t muxerPid;
- std::mutex lock;
+ mutable std::mutex wlock;
+ mutable std::mutex rlock;
};
#endif