summaryrefslogtreecommitdiff
path: root/p2pvr/daemon/muxedFileSink.cpp
blob: ff431bc96821a21449d0321aaefae6cc2580f501 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "muxedFileSink.h"
#include <logger.h>
#include <poll.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>

namespace P2PVR {
class MuxerFailure : public P2PVR::DataHandlingException { };

MuxedFileSink::MuxedFileSink(const boost::filesystem::path & t, const std::string & cmd) :
	logger(LOGMANAGER()->getLogger<MuxedFileSink>())
{
	std::vector<std::string> params;
	boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
	for (auto & c : params) {
		if (c == "${TARGET}") {
			c = t.string();
		}
	}
	fds = ProcessPipesPtr(new AdHoc::System::ProcessPipes(params, true, true, true));
	fcntl(fds->fdIn(), F_SETFL, O_NONBLOCK);
	logger->messagebf(LOG::INFO, "Muxer::%p started with command '%s' for %s", this, cmd, t);
}

MuxedFileSink::~MuxedFileSink()
{
	close(fds->fdIn());
	int status;
	while (waitpid(fds->pid(), &status, WNOHANG) == 0) {
		ReadMuxer(5);
	}
	logger->messagebf(LOG::INFO, "Muxer::%p finished with status %d", this, status);
}

bool
MuxedFileSink::NewData(const P2PVR::Data & data, const Ice::Current &)
{
	std::lock_guard<std::mutex> g(lock);
	for (size_t off = 0; off < data.size(); ) {
		// Read output until input wouldn't block
		if (ReadWaiting())
			return true;
		// Send some input
		auto w = write(fds->fdIn(), &data[off], data.size() - off);
		if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
			logger->messagebf(LOG::ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
			throw MuxerFailure();
		}
		off += w;
	}
	// Read anything that's come out
	return ReadAvailable();
}

bool
MuxedFileSink::ReadWaiting() const
{
	pollfd fd = { fds->fdIn(), POLLOUT, 0 };
	while (true) {
		auto p = poll(&fd, 1, 0);
		if (p < 0) {
			// error
			throw MuxerFailure();
		}
		else if (p == 0) {
			// write would block
			if (ReadMuxer(1)) return true;
		}
		else {
			// write would not block
			if (ReadMuxer(0)) return true;
			break;
		}
	}
	return false;
}

bool
MuxedFileSink::ReadAvailable() const
{
	return ReadMuxer(0);
}

bool
MuxedFileSink::ReadMuxer(int waitTime) const
{
	pollfd fd[2] = { { fds->fdOut(), POLLIN | POLLHUP, 0 }, { fds->fdError(), POLLIN | POLLHUP, 0 } };
	while (true) {
		auto p = poll(fd, 2, waitTime);
		if (p < 0) {
			// error
			throw MuxerFailure();
		}
		else if (p == 0) {
			// all ok
			return false;
		}
		else {
			bool closed = false;
			for (int i = 0; i < 2; ++i) {
				if (fd[i].revents & (POLLIN | POLLHUP)) {
					P2PVR::Data buf(BUFSIZ);
					auto len = read(fd[i].fd, &buf.front(), buf.size());
					if (len == 0) {
						// ok, proc exit
						closed = true;
					}
					if (len < 0) {
						// error
						throw MuxerFailure();
					}
					buf.resize(len);
					std::vector<std::string> lines;
					boost::algorithm::split(lines, buf, boost::algorithm::is_any_of("\r\n\f"), boost::algorithm::token_compress_on);
					for (const auto & line : lines) {
						if (line.empty()) continue;
						logger->messagebf(LOG::INFO, "Muxer::%p > %s", this, line);
					}
				}
			}
			if (closed) return true;
		}
	}
}
}