1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#include <pch.hpp>
#include "muxedFileSink.h"
#include <logger.h>
#include <misc.h>
#include <poll.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
class MuxerFailure : public P2PVR::DataHandlingException { };
MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd)
{
std::vector<std::string> params;
boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
BOOST_FOREACH(auto & c, params) {
if (c == "${TARGET}") {
c = t.string();
}
}
muxerPid = popenrwe(params, fds);
fcntl(fds[0], F_SETFL, O_NONBLOCK);
Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t);
}
MuxedFileSink::~MuxedFileSink()
{
close(fds[0]);
int status;
while (waitpid(muxerPid, &status, WNOHANG) == 0) {
ReadMuxer(5);
}
Logger()->messagebf(LOG_INFO, "Muxer::%p finished with status %d", this, status);
close(fds[1]);
close(fds[2]);
}
bool
MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &)
{
std::lock_guard<std::mutex> g(lock);
for (size_t off = 0; off < data.size(); ) {
// Read output until input wouldn't block
if (ReadWaiting())
return true;
// Send some input
auto w = write(fds[0], &data[off], data.size() - off);
if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
throw MuxerFailure();
}
off += w;
}
// Read anything that's come out
return ReadAvailable();
}
bool
MuxedFileSink::ReadWaiting() const
{
pollfd fd = { fds[0], POLLOUT, 0 };
while (true) {
auto p = poll(&fd, 1, 0);
if (p < 0) {
// error
throw MuxerFailure();
}
else if (p == 0) {
// write would block
if (ReadMuxer(1)) return true;
}
else {
// write would not block
if (ReadMuxer(0)) return true;
break;
}
}
return false;
}
bool
MuxedFileSink::ReadAvailable() const
{
return ReadMuxer(0);
}
bool
MuxedFileSink::ReadMuxer(int waitTime) const
{
pollfd fd[2] = { { fds[1], POLLIN | POLLHUP, 0 }, { fds[2], POLLIN | POLLHUP, 0 } };
while (true) {
auto p = poll(fd, 2, waitTime);
if (p < 0) {
// error
throw MuxerFailure();
}
else if (p == 0) {
// all ok
return false;
}
else {
bool closed = false;
for (int i = 0; i < 2; ++i) {
if (fd[i].revents & (POLLIN | POLLHUP)) {
P2PVR::Data buf(BUFSIZ);
auto len = read(fds[i + 1], &buf.front(), buf.size());
if (len == 0) {
// ok, proc exit
closed = true;
}
if (len < 0) {
// error
throw MuxerFailure();
}
buf.resize(len);
std::vector<std::string> lines;
boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
BOOST_FOREACH(const auto & line, lines) {
if (line.empty()) continue;
Logger()->messagebf(LOG_INFO, "Muxer::%p > %s", this, line);
}
}
}
if (closed) return true;
}
}
}
|