summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrandomdan <randomdan@localhost>2014-01-22 21:17:43 +0000
committerrandomdan <randomdan@localhost>2014-01-22 21:17:43 +0000
commit5c9738977aa14bd6e76922aa724628eb22b71a7d (patch)
tree19ca8dc374b6b1b4a10f49d5df427afcd095e883
parentFix up pipe handling in the muxer and write stderr of muxer command into the log (diff)
downloadp2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.bz2
p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.tar.xz
p2pvr-5c9738977aa14bd6e76922aa724628eb22b71a7d.zip
Refactor tuner to use specific implementations of new interface for send different (TS and SI) data to clients
Refactor muxer to not block and read output data whilst writing input data Add a (service) streamer for testing
-rw-r--r--p2pvr/Jamfile.jam3
-rw-r--r--p2pvr/lib/daemonBase.cpp1
-rw-r--r--p2pvr/lib/muxer.cpp20
-rw-r--r--p2pvr/lib/muxer.h3
-rw-r--r--p2pvr/lib/p2LoggerWrapper.cpp1
-rw-r--r--p2pvr/lib/recorder.cpp1
-rw-r--r--p2pvr/lib/tuner.cpp130
-rw-r--r--p2pvr/lib/tuner.h18
-rw-r--r--p2pvr/lib/tunerSendSi.cpp81
-rw-r--r--p2pvr/lib/tunerSendSi.h24
-rw-r--r--p2pvr/lib/tunerSendTs.cpp73
-rw-r--r--p2pvr/lib/tunerSendTs.h22
-rw-r--r--p2pvr/streamer/Jamfile.jam10
-rw-r--r--p2pvr/streamer/streamer.cpp52
14 files changed, 329 insertions, 110 deletions
diff --git a/p2pvr/Jamfile.jam b/p2pvr/Jamfile.jam
index 8f82f35..53070ac 100644
--- a/p2pvr/Jamfile.jam
+++ b/p2pvr/Jamfile.jam
@@ -21,10 +21,11 @@ alias p2daemonlib : glibmm : : :
<cflags>"-I /usr/include/project2/daemon/lib"
<linkflags>"-lp2daemonlib"
;
+build-project streamer ;
build-project daemon ;
build-project carddaemon ;
-install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice : <location>./testing ;
+install debuginstall : lib//p2pvrlib util//p2pvrutil p2//p2pvrp2 carddaemon daemon ice streamer//streamer : <location>./testing ;
package.install install : : : carddaemon daemon p2//p2pvrp2 ;
import type ;
diff --git a/p2pvr/lib/daemonBase.cpp b/p2pvr/lib/daemonBase.cpp
index ac429ea..1e946d8 100644
--- a/p2pvr/lib/daemonBase.cpp
+++ b/p2pvr/lib/daemonBase.cpp
@@ -1,3 +1,4 @@
+#include <pch.hpp>
#include "daemonBase.h"
#include "p2LoggerWrapper.h"
#include <logger.h>
diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp
index 25f6b57..16bb410 100644
--- a/p2pvr/lib/muxer.cpp
+++ b/p2pvr/lib/muxer.cpp
@@ -4,6 +4,7 @@
#include <misc.h>
#include <poll.h>
#include <sys/wait.h>
+#include <fcntl.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
@@ -13,6 +14,7 @@ Muxer::Muxer(const P2PVR::RawDataClientPrx & t, const std::string & cmd) :
std::vector<std::string> params;
boost::algorithm::split(params, cmd, isspace, boost::algorithm::token_compress_on);
muxerPid = popenrwe(params, fds);
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
Logger()->messagebf(LOG_INFO, "Muxer::%p started with command '%s'", this, cmd);
}
@@ -30,21 +32,26 @@ Muxer::~Muxer()
bool
Muxer::NewData(const P2PVR::Data & data, const Ice::Current &)
{
- if (ReadWaiting())
- return true;
- {
- std::lock_guard<std::mutex> g(wlock);
- if (write(fds[0], &data.front(), data.size()) < (int)data.size()) {
+ std::lock_guard<std::mutex> g(lock);
+ for (size_t off = 0; off < data.size(); ) {
+ // Read output until input wouldn't block
+ if (ReadWaiting())
+ return true;
+ // Send some input
+ auto w = write(fds[0], &data[off], data.size() - off);
+ if (w == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ Logger()->messagebf(LOG_ERR, "Muxer::%p write failed (%d:%s)", this, errno, strerror(errno));
return true;
}
+ off += w;
}
+ // Read anything that's come out
return ReadAvailable();
}
bool
Muxer::ReadWaiting() const
{
- std::lock_guard<std::mutex> g(rlock);
pollfd fd = { fds[0], POLLOUT, 0 };
while (true) {
auto p = poll(&fd, 1, 0);
@@ -68,7 +75,6 @@ Muxer::ReadWaiting() const
bool
Muxer::ReadAvailable() const
{
- std::lock_guard<std::mutex> g(rlock);
return ReadMuxerAndSend(0);
}
diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h
index e4789cf..3a492bd 100644
--- a/p2pvr/lib/muxer.h
+++ b/p2pvr/lib/muxer.h
@@ -18,8 +18,7 @@ class Muxer : public P2PVR::RawDataClient {
const P2PVR::RawDataClientPrx target;
int fds[3];
pid_t muxerPid;
- mutable std::mutex wlock;
- mutable std::mutex rlock;
+ mutable std::mutex lock;
};
#endif
diff --git a/p2pvr/lib/p2LoggerWrapper.cpp b/p2pvr/lib/p2LoggerWrapper.cpp
index 42d4757..60ec191 100644
--- a/p2pvr/lib/p2LoggerWrapper.cpp
+++ b/p2pvr/lib/p2LoggerWrapper.cpp
@@ -1,3 +1,4 @@
+#include <pch.hpp>
#include "p2LoggerWrapper.h"
#include "logger.h"
diff --git a/p2pvr/lib/recorder.cpp b/p2pvr/lib/recorder.cpp
index bc13ee2..98c7c4e 100644
--- a/p2pvr/lib/recorder.cpp
+++ b/p2pvr/lib/recorder.cpp
@@ -1,3 +1,4 @@
+#include <pch.hpp>
#include "recorder.h"
#include "bindTimerTask.h"
#include <boost/bind.hpp>
diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp
index bb19a3c..8ebf1a2 100644
--- a/p2pvr/lib/tuner.cpp
+++ b/p2pvr/lib/tuner.cpp
@@ -9,11 +9,11 @@
#include <plugable.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
-#include <boost/crc.hpp>
#include <boost/tuple/tuple.hpp>
#include "fileHandle.h"
-#include "siParsers/table.h"
#include <cxxabi.h>
+#include "tunerSendSi.h"
+#include "tunerSendTs.h"
class FrontendNotSupported : public NotSupported {
public:
@@ -177,14 +177,11 @@ uint64_t
Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) const
{
Logger()->messagebf(LOG_DEBUG, "%s: begin", __PRETTY_FUNCTION__);
- std::vector<Ice::AsyncResultPtr> asyncs;
struct pollfd ufd;
memset(&ufd, 0, sizeof(pollfd));
ufd.fd = demux;
ufd.events = POLLIN | POLLPRI;
- uint64_t packetsSent = 0;
- bool exitFlag = false;
- auto client = _client->ice_collocationOptimized(false);
+ BackgroundClient client = BackgroundClient(new SendSi(_client));
do {
// Wait for data to appear
switch (poll(&ufd, 1, DemuxReadTimeout)) {
@@ -208,54 +205,16 @@ Tuner::ReadDemuxAndSend(int demux, const P2PVR::RawDataClientPrx & _client) cons
size_t n = nr;
buf.resize(n);
- if (!IsValidSection(buf)) {
- continue;
- }
-
- asyncs.push_back(client->begin_NewData(buf));
- packetsSent += 1;
+ client->NewData(buf);
time(&lastUsedTime);
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [&exitFlag, &client](const Ice::AsyncResultPtr & a) {
- if (a->isCompleted()) {
- exitFlag = client->end_NewData(a);
- return true;
- }
- return false;
- }), asyncs.end());
- } while (!exitFlag);
- BOOST_FOREACH(const auto & a, asyncs) {
- client->end_NewData(a);
- }
- Logger()->messagebf(LOG_DEBUG, "%s: end", __PRETTY_FUNCTION__);
+ } while (!client->IsFinished());
+ auto packetsSent = client->PacketsSent();
+ client.reset();
+ Logger()->messagebf(LOG_DEBUG, "%s: end (sent %d packets)", __PRETTY_FUNCTION__, packetsSent);
return packetsSent;
}
-bool
-Tuner::IsValidSection(const P2PVR::Data & buf)
-{
- auto n = buf.size();
- if (n < sizeof(SiTableHeader)) {
- Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table.");
- return false;
- }
- auto * tab = (const SiTableHeader *)(&buf.front());
- size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length);
- if (n < l) {
- Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length.");
- return false;
- }
- if (n > l) {
- Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length.");
- return false;
- }
- if (!crc32(buf)) {
- Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed).");
- return false;
- }
- return true;
-}
-
int
Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &)
{
@@ -264,7 +223,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons
std::lock_guard<std::mutex> g(lock);
int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
- BackgroundClient(client->ice_collocationOptimized(false), &Tuner::IsValidSection))).first->first;
+ BackgroundClient(new SendSi(client)))).first->first;
RequestPID(pid, demux);
startSenderThread();
return demux;
@@ -283,7 +242,8 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP
ice.con->createProxy(client->ice_getIdentity());
}
std::lock_guard<std::mutex> g(lock);
- int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first;
+ int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(),
+ BackgroundClient(new SendTs(client)))).first->first;
struct dmx_pes_filter_params pesFilterParams;
memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params));
@@ -355,8 +315,6 @@ void
Tuner::senderThread()
{
lock.lock();
- typedef boost::tuple<P2PVR::RawDataClientPrx, Ice::AsyncResultPtr, int> AsyncCall;
- std::vector<AsyncCall> asyncs;
while (!backgroundClients.empty()) {
int n = backgroundClients.rbegin()->first + 1;
fd_set rfds;
@@ -389,52 +347,22 @@ Tuner::senderThread()
}
size_t n = nr;
buf.resize(n);
- if (!boost::get<1>(c.second) || boost::get<1>(c.second)(buf)) {
- // Send it
- asyncs.push_back(AsyncCall(boost::get<0>(c.second), boost::get<0>(c.second)->begin_NewData(buf), c.first));
- }
+ c.second->NewData(buf);
}
}
}
break;
}
// Clean up finished async requests
- asyncs.erase(std::remove_if(asyncs.begin(), asyncs.end(), [this](const AsyncCall & a) {
- time(&lastUsedTime);
- try {
- if (a.get<1>()->isCompleted()) {
- if (a.get<0>()->end_NewData(a.get<1>())) {
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- }
- return true;
- }
- return false;
- }
- catch (const std::exception & ex) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- catch (...) {
- Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error", __PRETTY_FUNCTION__);
- close(a.get<2>());
- std::lock_guard<std::mutex> g(lock);
- backgroundClients.erase(a.get<2>());
- return true;
- }
- }), asyncs.end());
lock.lock();
- }
- Logger()->messagebf(LOG_DEBUG, "%s: Cleaning up", __PRETTY_FUNCTION__);
- BOOST_FOREACH(const auto & a, asyncs) {
- try {
- a.get<0>()->end_NewData(a.get<1>());
- }
- catch (...) {
+ for (auto client = backgroundClients.begin(); client != backgroundClients.end(); ) {
+ if (client->second->IsFinished()) {
+ close(client->first);
+ client = backgroundClients.erase(client);
+ }
+ else {
+ client++;
+ }
}
}
backgroundThread = NULL;
@@ -448,12 +376,20 @@ Tuner::GetLastUsedTime(const Ice::Current &)
return lastUsedTime;
}
-bool
-Tuner::crc32(const P2PVR::Data & buf)
+Tuner::IDataSender::IDataSender(const P2PVR::RawDataClientPrx & c) :
+ _packetsSent(0),
+ client(c)
+{
+}
+
+Tuner::IDataSender::~IDataSender()
+{
+}
+
+uint64_t
+Tuner::IDataSender::PacketsSent() const
{
- boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc;
- crc.process_bytes(&buf.front(), buf.size());
- return crc.checksum() == 0;
+ return _packetsSent;
}
int Tuner::TuningTimeout;
diff --git a/p2pvr/lib/tuner.h b/p2pvr/lib/tuner.h
index 381c804..57bdd82 100644
--- a/p2pvr/lib/tuner.h
+++ b/p2pvr/lib/tuner.h
@@ -14,6 +14,21 @@
class Tuner : public P2PVR::PrivateTuner {
public:
+ class IDataSender {
+ public:
+ IDataSender(const P2PVR::RawDataClientPrx &);
+ virtual ~IDataSender() = 0;
+
+ virtual void NewData(const P2PVR::Data &) = 0;
+ virtual bool IsFinished() = 0;
+ uint64_t PacketsSent() const;
+
+ protected:
+ uint64_t _packetsSent;
+ const P2PVR::RawDataClientPrx client;
+ };
+ typedef boost::shared_ptr<IDataSender> BackgroundClient;
+
Tuner(const boost::filesystem::path & deviceFrontend);
~Tuner();
@@ -38,12 +53,10 @@ class Tuner : public P2PVR::PrivateTuner {
INITOPTIONS;
private:
- static bool crc32(const P2PVR::Data &);
int OpenDemux() const;
uint64_t SendPID(int pid, const P2PVR::RawDataClientPrx & client, const Ice::Current &) const;
static void RequestPID(int pid, int fd);
uint64_t ReadDemuxAndSend(int fd, const P2PVR::RawDataClientPrx & client) const;
- static bool IsValidSection(const P2PVR::Data &);
void startSenderThread();
void senderThread();
static void setBufferSize(int fd, unsigned long bytes);
@@ -51,7 +64,6 @@ class Tuner : public P2PVR::PrivateTuner {
const boost::filesystem::path deviceFrontend;
const boost::filesystem::path deviceRoot;
typedef boost::function<bool(const P2PVR::Data &)> PacketCheckFunction;
- typedef boost::tuple<P2PVR::RawDataClientPrx, PacketCheckFunction> BackgroundClient;
typedef std::map<int, BackgroundClient> BackgroundClients;
BackgroundClients backgroundClients;
std::thread * backgroundThread;
diff --git a/p2pvr/lib/tunerSendSi.cpp b/p2pvr/lib/tunerSendSi.cpp
new file mode 100644
index 0000000..e1f4637
--- /dev/null
+++ b/p2pvr/lib/tunerSendSi.cpp
@@ -0,0 +1,81 @@
+#include <pch.hpp>
+#include "tunerSendSi.h"
+#include <logger.h>
+#include <boost/crc.hpp>
+#include "siParsers/table.h"
+
+SendSi::SendSi(const P2PVR::RawDataClientPrx & c) :
+ Tuner::IDataSender(c->ice_collocationOptimized(false))
+{
+}
+
+SendSi::~SendSi()
+{
+}
+
+void
+SendSi::NewData(const P2PVR::Data & buf)
+{
+ if (!IsValidSection(buf)) {
+ return;
+ }
+ _packetsSent += 1;
+ asyncs.insert(client->begin_NewData(buf));
+}
+
+bool
+SendSi::IsFinished()
+{
+ try {
+ for (auto c = asyncs.begin(); c != asyncs.end(); ) {
+ if ((*c)->isCompleted()) {
+ if (client->end_NewData(*c)) {
+ return true;
+ }
+ c = asyncs.erase(c);
+ }
+ else {
+ c++;
+ }
+ }
+ return false;
+ }
+ catch (const std::exception & ex) {
+ Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
+ return true;
+ }
+}
+
+bool
+SendSi::IsValidSection(const P2PVR::Data & buf)
+{
+ auto n = buf.size();
+ if (n < sizeof(SiTableHeader)) {
+ Logger()->messagebf(LOG_WARNING, "Received data too small to be an SI table.");
+ return false;
+ }
+ auto * tab = (const SiTableHeader *)(&buf.front());
+ size_t l = sizeof(SiTableHeaderBase) + HILO(tab->section_length);
+ if (n < l) {
+ Logger()->messagebf(LOG_WARNING, "Received data shorter than its defined length.");
+ return false;
+ }
+ if (n > l) {
+ Logger()->messagebf(LOG_WARNING, "Received data longer than its defined length.");
+ return false;
+ }
+ if (!crc32(buf)) {
+ Logger()->messagebf(LOG_WARNING, "Received data is corrupted (crc32 failed).");
+ return false;
+ }
+ return true;
+}
+
+bool
+SendSi::crc32(const P2PVR::Data & buf)
+{
+ boost::crc_optimal<32, 0x0, 0xFFFFFFFF, 0x0, true, false> crc;
+ crc.process_bytes(&buf.front(), buf.size());
+ return crc.checksum() == 0;
+}
+
diff --git a/p2pvr/lib/tunerSendSi.h b/p2pvr/lib/tunerSendSi.h
new file mode 100644
index 0000000..df48f43
--- /dev/null
+++ b/p2pvr/lib/tunerSendSi.h
@@ -0,0 +1,24 @@
+#ifndef TUNER_SENDSI_H
+#define TUNER_SENDSI_H
+
+#include "tuner.h"
+
+class SendSi : public Tuner::IDataSender {
+ public:
+ SendSi(const P2PVR::RawDataClientPrx &);
+ ~SendSi();
+
+ void NewData(const P2PVR::Data &);
+ bool IsFinished();
+
+ private:
+ static bool crc32(const P2PVR::Data &);
+ static bool IsValidSection(const P2PVR::Data &);
+
+ std::set<Ice::AsyncResultPtr> asyncs;
+ bool finish;
+};
+
+#endif
+
+
diff --git a/p2pvr/lib/tunerSendTs.cpp b/p2pvr/lib/tunerSendTs.cpp
new file mode 100644
index 0000000..d5ea1bb
--- /dev/null
+++ b/p2pvr/lib/tunerSendTs.cpp
@@ -0,0 +1,73 @@
+#include <pch.hpp>
+#include "tunerSendTs.h"
+#include <logger.h>
+
+// ~64kb of TS packets
+#define TARGET_BUFFER_SIZE (350 * 188)
+// About the ICE message size limit
+#define TARGET_BUFFER_LIMIT 512 * 1024
+
+SendTs::SendTs(const P2PVR::RawDataClientPrx & c) :
+ Tuner::IDataSender(c->ice_collocationOptimized(false))
+{
+ buffer.reserve(TARGET_BUFFER_SIZE);
+}
+
+SendTs::~SendTs()
+{
+ if (async) {
+ client->end_NewData(async);
+ }
+ while (!buffer.empty()) {
+ sendBufferChunk();
+ client->end_NewData(async);
+ }
+}
+
+void
+SendTs::NewData(const P2PVR::Data & buf)
+{
+ buffer.insert(buffer.end(), buf.begin(), buf.end());
+ if (!async && buffer.size() >= TARGET_BUFFER_SIZE) {
+ sendBufferChunk();
+ }
+}
+
+void
+SendTs::sendBufferChunk()
+{
+ if (buffer.size() > TARGET_BUFFER_LIMIT) {
+ auto breakPoint = buffer.begin() + TARGET_BUFFER_LIMIT;
+ async = client->begin_NewData(P2PVR::Data(buffer.begin(), breakPoint));
+ buffer.erase(buffer.begin(), breakPoint);
+ }
+ else {
+ async = client->begin_NewData(buffer);
+ buffer.clear();
+ buffer.reserve(TARGET_BUFFER_SIZE);
+ }
+ _packetsSent += 1;
+}
+
+bool
+SendTs::IsFinished()
+{
+ try {
+ if (async && async->isCompleted()) {
+ auto finished = client->end_NewData(async);
+ async = NULL;
+ if (finished) {
+ buffer.clear();
+ }
+ return finished;
+ }
+ return false;
+ }
+ catch (const std::exception & ex) {
+ async = NULL;
+ Logger()->messagebf(LOG_DEBUG, "%s: Client transmit error (%s)", __PRETTY_FUNCTION__, ex.what());
+ return true;
+ }
+}
+
+
diff --git a/p2pvr/lib/tunerSendTs.h b/p2pvr/lib/tunerSendTs.h
new file mode 100644
index 0000000..ecf32fd
--- /dev/null
+++ b/p2pvr/lib/tunerSendTs.h
@@ -0,0 +1,22 @@
+#ifndef TUNER_SENDTS_H
+#define TUNER_SENDTS_H
+
+#include "tuner.h"
+
+class SendTs : public Tuner::IDataSender {
+ public:
+ SendTs(const P2PVR::RawDataClientPrx &);
+ ~SendTs();
+
+ void NewData(const P2PVR::Data &);
+ bool IsFinished();
+
+ private:
+ void sendBufferChunk();
+
+ Ice::AsyncResultPtr async;
+ P2PVR::Data buffer;
+};
+
+#endif
+
diff --git a/p2pvr/streamer/Jamfile.jam b/p2pvr/streamer/Jamfile.jam
new file mode 100644
index 0000000..7f4d2b4
--- /dev/null
+++ b/p2pvr/streamer/Jamfile.jam
@@ -0,0 +1,10 @@
+
+
+lib streamer :
+ [ glob-tree *.cpp ]
+ : :
+ <library>../ice//p2pvrice
+ <library>../lib//p2pvrlib
+ <library>../util//p2pvrutil
+ <implicit-dependency>../ice//p2pvrice
+ ;
diff --git a/p2pvr/streamer/streamer.cpp b/p2pvr/streamer/streamer.cpp
new file mode 100644
index 0000000..2722266
--- /dev/null
+++ b/p2pvr/streamer/streamer.cpp
@@ -0,0 +1,52 @@
+#include <daemonBase.h>
+#include <p2pvr.h>
+#include "globalDevices.h"
+#include "si.h"
+#include <serviceStreamer.h>
+#include <fileSink.h>
+#include <muxer.h>
+
+class P2PvrStreamer : public DaemonBase {
+ public:
+ P2PvrStreamer(int argc, char ** argv) :
+ DaemonBase(argc, argv)
+ {
+ }
+
+ void addServants(const Ice::ObjectAdapterPtr & adapter, const IceUtil::TimerPtr &) const
+ {
+ adapter->add(new GlobalDevices(), ic->stringToIdentity("GlobalDevices"));
+ adapter->add(new SI(), ic->stringToIdentity("SI"));
+ auto output = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new FileSink(1)));
+ assert(output);
+ auto muxer = P2PVR::RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(output, "/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -")));
+ assert(muxer);
+ ss = ServiceStreamerPtr(new ServiceStreamer(4287, muxer, ic, adapter));
+ assert(ss);
+ }
+
+ void run() const
+ {
+ IceUtil::TimerPtr timer = new IceUtil::Timer();
+ Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint);
+ auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint);
+ addServants(adapter, timer);
+ adapter->activate();
+
+ ss->Start();
+
+ ic->waitForShutdown();
+ timer->destroy();
+ }
+
+ void shutdown() const
+ {
+ ss->Stop();
+ DaemonBase::shutdown();
+ }
+ private:
+ mutable ServiceStreamerPtr ss;
+};
+
+DECLARE_GENERIC_LOADER("p2pvrstreamer", DaemonLoader, P2PvrStreamer);
+