1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
#include "muxedFileSink.h"
#include <logger.h>
#include <poll.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
namespace P2PVR {
class MuxerFailure : public P2PVR::DataHandlingException { };
MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd) :
logger(LOGMANAGER()->getLogger<MuxedFileSink>())
{
std::vector<std::string> params;
boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
for (auto & c : params) {
if (c == "${TARGET}") {
c = t.string();
}
}
fds = ProcessPipesPtr(new AdHoc::System::ProcessPipes(params, true, true, true));
fcntl(fds->fdIn(), F_SETFL, O_NONBLOCK);
logger->messagebf(LOG::INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t);
}
MuxedFileSink::~MuxedFileSink()
{
close(fds->fdIn());
int status;
while (waitpid(fds->pid(), &status, WNOHANG) == 0) {
ReadMuxer(5);
}
logger->messagebf(LOG::INFO, "Muxer::%p finished with status %d", this, status);
}
bool
MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &)
{
std::lock_guard<std::mutex> g(lock);
for (size_t off = 0; off < data.size(); ) {
// Read output until input wouldn't block
if (ReadWaiting())
return true;
// Send some input
auto w = write(fds->fdIn(), &data[off], data.size() - off);
if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
logger->messagebf(LOG::ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
throw MuxerFailure();
}
off += w;
}
// Read anything that's come out
return ReadAvailable();
}
bool
MuxedFileSink::ReadWaiting() const
{
pollfd fd = { fds->fdIn(), POLLOUT, 0 };
while (true) {
auto p = poll(&fd, 1, 0);
if (p < 0) {
// error
throw MuxerFailure();
}
else if (p == 0) {
// write would block
if (ReadMuxer(1)) return true;
}
else {
// write would not block
if (ReadMuxer(0)) return true;
break;
}
}
return false;
}
bool
MuxedFileSink::ReadAvailable() const
{
return ReadMuxer(0);
}
bool
MuxedFileSink::ReadMuxer(int waitTime) const
{
pollfd fd[2] = { { fds->fdOut(), POLLIN | POLLHUP, 0 }, { fds->fdError(), POLLIN | POLLHUP, 0 } };
while (true) {
auto p = poll(fd, 2, waitTime);
if (p < 0) {
// error
throw MuxerFailure();
}
else if (p == 0) {
// all ok
return false;
}
else {
bool closed = false;
for (int i = 0; i < 2; ++i) {
if (fd[i].revents & (POLLIN | POLLHUP)) {
P2PVR::Data buf(BUFSIZ);
auto len = read(fd[i].fd, &buf.front(), buf.size());
if (len == 0) {
// ok, proc exit
closed = true;
}
if (len < 0) {
// error
throw MuxerFailure();
}
buf.resize(len);
std::vector<std::string> lines;
boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
for (const auto & line : lines) {
if (line.empty()) continue;
logger->messagebf(LOG::INFO, "Muxer::%p > %s", this, line);
}
}
}
if (closed) return true;
}
}
}
}
|