From 68ab7b71044f2860d0b421d41344914d27f86cc2 Mon Sep 17 00:00:00 2001 From: randomdan Date: Sun, 5 Jan 2014 15:46:17 +0000 Subject: Unify the daemon base code Enable multiple threads Add mutexes where required Proxy ICE logs into p2 logger --- p2pvr/carddaemon/Jamfile.jam | 1 - p2pvr/carddaemon/carddaemon.cpp | 48 ++++--------------------------- p2pvr/daemon/Jamfile.jam | 1 - p2pvr/daemon/daemon.cpp | 46 ++++-------------------------- p2pvr/lib/Jamfile.jam | 1 + p2pvr/lib/daemonBase.cpp | 62 +++++++++++++++++++++++++++++++++++++++++ p2pvr/lib/daemonBase.h | 31 +++++++++++++++++++++ p2pvr/lib/muxer.cpp | 6 ++-- p2pvr/lib/muxer.h | 2 ++ p2pvr/lib/p2LoggerWrapper.cpp | 38 +++++++++++++++++++++++++ p2pvr/lib/p2LoggerWrapper.h | 23 +++++++++++++++ p2pvr/lib/siParsers/table.h | 3 ++ p2pvr/lib/tuner.cpp | 7 +++-- 13 files changed, 179 insertions(+), 90 deletions(-) create mode 100644 p2pvr/lib/daemonBase.cpp create mode 100644 p2pvr/lib/daemonBase.h create mode 100644 p2pvr/lib/p2LoggerWrapper.cpp create mode 100644 p2pvr/lib/p2LoggerWrapper.h diff --git a/p2pvr/carddaemon/Jamfile.jam b/p2pvr/carddaemon/Jamfile.jam index 9b4aa83..5efeb01 100644 --- a/p2pvr/carddaemon/Jamfile.jam +++ b/p2pvr/carddaemon/Jamfile.jam @@ -4,7 +4,6 @@ lib boost_system ; lib p2pvrcarddaemon : [ glob *.cpp ] : - ..//p2daemonlib ../ice//p2pvrice ../lib//p2pvrlib ; diff --git a/p2pvr/carddaemon/carddaemon.cpp b/p2pvr/carddaemon/carddaemon.cpp index 1ea7e43..a9b9beb 100644 --- a/p2pvr/carddaemon/carddaemon.cpp +++ b/p2pvr/carddaemon/carddaemon.cpp @@ -1,55 +1,19 @@ -#include -#include -#include -#include +#include #include "localDevices.h" +#include -class P2PvrCardDaemon : public Daemon { +class P2PvrCardDaemon : public DaemonBase { public: P2PvrCardDaemon(int argc, char ** argv) : - ic(Ice::initialize(argc, argv)) + DaemonBase(argc, argv) { } - ~P2PvrCardDaemon() + void addServants(const Ice::ObjectAdapterPtr & adapter, const IceUtil::TimerPtr & timer) const { - ic->destroy(); + adapter->add(new LocalDevices(adapter, timer), ic->stringToIdentity("Devices")); } - - void run() const - { - IceUtil::TimerPtr timer = new IceUtil::Timer(); - Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint); - auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint); - P2PVR::LocalDevicesPrx::checkedCast(adapter->add(new LocalDevices(adapter, timer), ic->stringToIdentity(Identity))); - adapter->activate(); - ic->waitForShutdown(); - timer->destroy(); - } - - void shutdown() const - { - ic->shutdown(); - } - INITOPTIONS; - - private: - Ice::CommunicatorPtr ic; - - static std::string Adapter; - static std::string Endpoint; - static std::string Identity; }; -std::string P2PvrCardDaemon::Adapter; -std::string P2PvrCardDaemon::Endpoint; -std::string P2PvrCardDaemon::Identity; - DECLARE_GENERIC_LOADER("p2pvrcarddaemon", DaemonLoader, P2PvrCardDaemon); -DECLARE_OPTIONS(P2PvrCardDaemon, "P2PVR Card Daemon") -("p2pvr.carddaemon.iceadapter", Options::value(&Adapter, "DefaultAdapter"), "ICE Adapter name") -("p2pvr.carddaemon.iceendpoint", Options::value(&Endpoint, "default -p 10001"), "ICE Endpoint address") -("p2pvr.carddaemon.iceidentity", Options::value(&Identity, "Devices"), "ICE Interface identity") -END_OPTIONS(P2PvrCardDaemon); - diff --git a/p2pvr/daemon/Jamfile.jam b/p2pvr/daemon/Jamfile.jam index 7379c7a..8a6ee8c 100644 --- a/p2pvr/daemon/Jamfile.jam +++ b/p2pvr/daemon/Jamfile.jam @@ -4,7 +4,6 @@ lib IceUtil ; lib p2pvrdaemon : [ glob *.cpp ] : - ..//p2daemonlib ../ice//p2pvrice ../lib//p2pvrlib ; diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index 6f0d326..d4b59a0 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -1,63 +1,27 @@ -#include -#include -#include +#include #include "localDevices.h" #include "globalDevices.h" #include "maintenance.h" #include "si.h" #include "schedules.h" -#include +#include -#include - -class P2PvrDaemon : public Daemon { +class P2PvrDaemon : public DaemonBase { public: P2PvrDaemon(int argc, char ** argv) : - ic(Ice::initialize(argc, argv)) - { - } - - ~P2PvrDaemon() + DaemonBase(argc, argv) { - ic->destroy(); } - void run() const + void addServants(const Ice::ObjectAdapterPtr & adapter, const IceUtil::TimerPtr & timer) const { - IceUtil::TimerPtr timer = new IceUtil::Timer(); - Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint); - auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint); adapter->add(new LocalDevices(adapter, timer), ic->stringToIdentity("Devices")); adapter->add(new GlobalDevices(), ic->stringToIdentity("GlobalDevices")); adapter->add(new Maintenance(adapter, timer), ic->stringToIdentity("Maintenance")); adapter->add(new SI(), ic->stringToIdentity("SI")); adapter->add(new Schedules(), ic->stringToIdentity("Schedules")); - adapter->activate(); - - ic->waitForShutdown(); - timer->destroy(); - } - - void shutdown() const - { - ic->shutdown(); } - INITOPTIONS; - - private: - Ice::CommunicatorPtr ic; - - static std::string Adapter; - static std::string Endpoint; }; -std::string P2PvrDaemon::Adapter; -std::string P2PvrDaemon::Endpoint; - DECLARE_GENERIC_LOADER("p2pvrdaemon", DaemonLoader, P2PvrDaemon); -DECLARE_OPTIONS(P2PvrDaemon, "P2PVR Daemon") -("p2pvr.daemon.iceadapter", Options::value(&Adapter, "DefaultAdapter"), "ICE Adapter name") -("p2pvr.daemon.iceendpoint", Options::value(&Endpoint, "default -p 10000"), "ICE Endpoint address") -END_OPTIONS(P2PvrDaemon); - diff --git a/p2pvr/lib/Jamfile.jam b/p2pvr/lib/Jamfile.jam index 1b250e7..1868a6a 100644 --- a/p2pvr/lib/Jamfile.jam +++ b/p2pvr/lib/Jamfile.jam @@ -7,6 +7,7 @@ cpp-pch pch : pch.hpp : ..//p2common ..//p2sql ..//p2lib + ..//p2daemonlib ../ice//p2pvrice ; diff --git a/p2pvr/lib/daemonBase.cpp b/p2pvr/lib/daemonBase.cpp new file mode 100644 index 0000000..ac429ea --- /dev/null +++ b/p2pvr/lib/daemonBase.cpp @@ -0,0 +1,62 @@ +#include "daemonBase.h" +#include "p2LoggerWrapper.h" +#include +#include + +std::string DaemonBase::Adapter; +std::string DaemonBase::Endpoint; + +DECLARE_OPTIONS(DaemonBase, "P2PVR Daemon") +("p2pvr.daemon.iceadapter", Options::value(&Adapter, "DefaultAdapter"), "ICE Adapter name") +("p2pvr.daemon.iceendpoint", Options::value(&Endpoint, "default -p 10000"), "ICE Endpoint address") +END_OPTIONS(DaemonBase); + +DaemonBase::DaemonBase(int argc, char ** argv) : + ic(Ice::initialize(args(argc, argv), initData())) +{ +} + +DaemonBase::~DaemonBase() +{ + ic->destroy(); +} + +void +DaemonBase::run() const +{ + IceUtil::TimerPtr timer = new IceUtil::Timer(); + Logger()->messagebf(LOG_INFO, "Creating adapter (%s, %s)", Adapter, Endpoint); + auto adapter = ic->createObjectAdapterWithEndpoints(Adapter, Endpoint); + addServants(adapter, timer); + adapter->activate(); + + ic->waitForShutdown(); + timer->destroy(); +} + +void +DaemonBase::shutdown() const +{ + ic->shutdown(); +} + +Ice::InitializationData +DaemonBase::initData() +{ + Ice::InitializationData data; + data.logger = new P2LoggerWrapper(); + return data; +} + +Ice::StringSeq & +DaemonBase::args(int, char **) +{ + _args.clear(); + _args.push_back("--Ice.ThreadPool.Client.Size=5"); + _args.push_back("--Ice.ThreadPool.Server.Size=5"); + _args.push_back("--Ice.ThreadPool.Client.SizeMax=10"); + _args.push_back("--Ice.ThreadPool.Server.SizeMax=20"); + _args.push_back("--Ice.ThreadPool.Client.SizeWarn=8"); + _args.push_back("--Ice.ThreadPool.Server.SizeWarn=16"); + return _args; +} diff --git a/p2pvr/lib/daemonBase.h b/p2pvr/lib/daemonBase.h new file mode 100644 index 0000000..3cf9b19 --- /dev/null +++ b/p2pvr/lib/daemonBase.h @@ -0,0 +1,31 @@ +#ifndef DAEMONBASE_H +#define DAEMONBASE_H + +#include +#include +#include +#include + +class DaemonBase : public Daemon { + public: + DaemonBase(int argc, char ** argv); + ~DaemonBase(); + + void run() const; + void shutdown() const; + INITOPTIONS; + + protected: + virtual void addServants(const Ice::ObjectAdapterPtr &, const IceUtil::TimerPtr &) const = 0; + static Ice::InitializationData initData(); + Ice::StringSeq & args(int, char **); + + Ice::StringSeq _args; + Ice::CommunicatorPtr ic; + + static std::string Adapter; + static std::string Endpoint; +}; + +#endif + diff --git a/p2pvr/lib/muxer.cpp b/p2pvr/lib/muxer.cpp index f7014ed..ae2ee28 100644 --- a/p2pvr/lib/muxer.cpp +++ b/p2pvr/lib/muxer.cpp @@ -32,6 +32,7 @@ Muxer::~Muxer() bool Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) { + std::lock_guard g(lock); ReadMuxerAndSend(); if (write(fds[0], &data.front(), data.size()) < 1) { return true; @@ -42,10 +43,9 @@ Muxer::NewData(const P2PVR::Data & data, const Ice::Current &) bool Muxer::ReadMuxerAndSend() const { + pollfd fd = { fds[1], POLLIN, 0 }; while (true) { - pollfd fd = { fds[1], POLLIN, 0 }; - const timespec timeout = {0, 0}; - auto p = ppoll(&fd, 1, &timeout, NULL); + auto p = poll(&fd, 1, 0); if (p < 0) { // error return true; diff --git a/p2pvr/lib/muxer.h b/p2pvr/lib/muxer.h index 1058b01..4b1187d 100644 --- a/p2pvr/lib/muxer.h +++ b/p2pvr/lib/muxer.h @@ -2,6 +2,7 @@ #define MUXER_H #include +#include class Muxer : public P2PVR::RawDataClient { public: @@ -14,6 +15,7 @@ class Muxer : public P2PVR::RawDataClient { bool ReadMuxerAndSend() const; const P2PVR::RawDataClientPrx & target; int fds[2]; + std::mutex lock; }; #endif diff --git a/p2pvr/lib/p2LoggerWrapper.cpp b/p2pvr/lib/p2LoggerWrapper.cpp new file mode 100644 index 0000000..42d4757 --- /dev/null +++ b/p2pvr/lib/p2LoggerWrapper.cpp @@ -0,0 +1,38 @@ +#include "p2LoggerWrapper.h" +#include "logger.h" + +P2LoggerWrapper::P2LoggerWrapper(const std::string & p) : + prefix(p) +{ +} + +void +P2LoggerWrapper::print(const std::string & message) +{ + ::Logger()->messagebf(LOG_INFO, "%s: %s", prefix, message); +} + +void +P2LoggerWrapper::trace(const std::string & cat, const std::string & message) +{ + ::Logger()->messagebf(LOG_DEBUG, "%s: [%s] %s", prefix, cat, message); +} + +void +P2LoggerWrapper::warning(const std::string & message) +{ + ::Logger()->messagebf(LOG_WARNING, "%s: %s", prefix, message); +} + +void +P2LoggerWrapper::error(const std::string & message) +{ + ::Logger()->messagebf(LOG_ERR, "%s: %s", prefix, message); +} + +Ice::LoggerPtr +P2LoggerWrapper::cloneWithPrefix(const std::string & p) +{ + return new P2LoggerWrapper(prefix + "-" + p); +} + diff --git a/p2pvr/lib/p2LoggerWrapper.h b/p2pvr/lib/p2LoggerWrapper.h new file mode 100644 index 0000000..8febda5 --- /dev/null +++ b/p2pvr/lib/p2LoggerWrapper.h @@ -0,0 +1,23 @@ +#ifndef P2LOGGERWRAPPER +#define P2LOGGERWRAPPER + +#include + +class P2LoggerWrapper : public Ice::Logger { + public: + P2LoggerWrapper(const std::string & prefix = std::string()); + + void print(const std::string & message); + void trace(const std::string & cat, const std::string & message); + void warning(const std::string & message); + void error(const std::string & message); + + Ice::LoggerPtr cloneWithPrefix(const std::string & prefix); + + private: + const std::string prefix; +}; + + +#endif + diff --git a/p2pvr/lib/siParsers/table.h b/p2pvr/lib/siParsers/table.h index 4ed2e8b..9de08d6 100644 --- a/p2pvr/lib/siParsers/table.h +++ b/p2pvr/lib/siParsers/table.h @@ -9,6 +9,7 @@ #include #include #include +#include typedef unsigned char u_char; @@ -36,6 +37,7 @@ class SiTableParserBase : public P2PVR::RawDataClient { virtual bool ParseInfoTable(const u_char * data, size_t len) = 0; time_t startTime; unsigned int incomplete; + std::mutex lock; }; struct SiTableHeaderBase { @@ -93,6 +95,7 @@ class SiTableParser : public SiTableParserBase { auto siTable = reinterpret_cast(data); if (siTable->header.current_next_indicator == 1 // current only, please. && CheckTableId(siTable->header.tableid)) { // only tables we're interested in, please. + std::lock_guard g(lock); uint16_t contentId = ntohs(siTable->header.content_id); ContentType & content = contents[contentId]; uint8_t sectionNumber = siTable->header.section_number >> SectionNumberShift(); diff --git a/p2pvr/lib/tuner.cpp b/p2pvr/lib/tuner.cpp index 1095667..f4263ee 100644 --- a/p2pvr/lib/tuner.cpp +++ b/p2pvr/lib/tuner.cpp @@ -271,7 +271,7 @@ Tuner::StartSendingSection(int pid, const P2PVR::RawDataClientPrx & client, cons } int -Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current &) +Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientPrx & client, const Ice::Current & ice) { time(&lastUsedTime); Logger()->message(LOG_DEBUG, __PRETTY_FUNCTION__); @@ -279,8 +279,11 @@ Tuner::StartSendingTS(const P2PVR::PacketIds & pids, const P2PVR::RawDataClientP throw P2PVR::DeviceError("demux", "Packet Id list cannot be empty", 0); } + if (ice.con) { + ice.con->createProxy(client->ice_getIdentity()); + } std::lock_guard g(lock); - int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client)).first->first; + int demux = backgroundClients.insert(BackgroundClients::value_type(OpenDemux(), client->ice_collocationOptimized(false))).first->first; struct dmx_pes_filter_params pesFilterParams; memset(&pesFilterParams, 0, sizeof(struct dmx_pes_filter_params)); -- cgit v1.2.3