diff options
Diffstat (limited to 'p2pvr/daemon/schedules.cpp')
-rw-r--r-- | p2pvr/daemon/schedules.cpp | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/p2pvr/daemon/schedules.cpp b/p2pvr/daemon/schedules.cpp new file mode 100644 index 0000000..90df59c --- /dev/null +++ b/p2pvr/daemon/schedules.cpp @@ -0,0 +1,484 @@ +#include <pch.hpp> +#include "schedules.h" +#include "sqlContainerCreator.h" +#include <rdbmsDataSource.h> +#include <logger.h> +#include <Ice/Ice.h> +#include <sqlVariableBinder.h> +#include <sqlMergeTask.h> +#include "p2Helpers.h" +#include "containerIterator.h" +#include "resources.h" +#include <boost/date_time/posix_time/posix_time.hpp> +#include <instanceStore.impl.h> + +ResourceString(Schedules_GetCandidates, daemon_sql_Schedules_GetCandidates_sql); +ResourceString(Schedules_insert, daemon_sql_Schedules_insert_sql); +ResourceString(Schedules_insertNewId, daemon_sql_Schedules_insertNewId_sql); +ResourceString(Schedules_update, daemon_sql_Schedules_update_sql); +ResourceString(Schedules_delete, daemon_sql_Schedules_delete_sql); +ResourceString(Schedules_selectAll, daemon_sql_Schedules_selectAll_sql); +ResourceString(Schedules_selectById, daemon_sql_Schedules_selectById_sql); +ResourceString(Schedules_scheduledToRecord, daemon_sql_Schedules_scheduledToRecord_sql); + +std::string Schedules::SchedulerAlgorithm; + +DECLARE_OPTIONS(Schedules, "P2PVR Scheduler options") +("p2pvr.scheduler.algorithm", Options::value(&SchedulerAlgorithm, "BitDumb"), + "Implementation of episode group scheduler problem solver") +END_OPTIONS() + +class ScheduleCandidate { + public: + std::string What; + int ServiceId; + int EventId; + int TransportStreamId; + datetime StartTime; + datetime StopTime; + int Priority; + int ScheduleId; +}; +typedef boost::shared_ptr<ScheduleCandidate> ScheduleCandidatePtr; +typedef std::vector<ScheduleCandidatePtr> ScheduleCandidates; + +enum RecordStatuses { + Record_WillRecordThisShowing = 0, + Record_WillRecordOtherShowing = 1, + Record_CannotRecordAnyShowing = 2 +}; + +class Record { + public: + Record() { }; + Record(int s, int e, RecordStatuses rs, int sc) : + ServiceId(s), + EventId(e), + RecordStatus(rs), + ScheduleId(sc) + { + } + + int ServiceId; + int EventId; + RecordStatuses RecordStatus; + int ScheduleId; +}; +typedef boost::shared_ptr<Record> RecordPtr; +typedef std::vector<RecordPtr> Records; + +template<> +void +CreateColumns<P2PVR::ScheduledToRecordPtr>(const ColumnCreator & cc) +{ + cc("serviceid", true); + cc("eventid", true); + cc("scheduleid", true); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::ScheduledToRecordPtr const & s) +{ + rs.fields[0] >> s->ServiceId; + rs.fields[1] >> s->EventId; + rs.fields[2] >> s->ScheduleId; +} + +template<> +void +CreateColumns<ScheduleCandidatePtr>(const ColumnCreator & cc) +{ + cc("what", true); + cc("serviceid", false); + cc("eventid", false); + cc("transportstreamid", false); + cc("starttime", false); + cc("stoptime", false); + cc("priority", false); + cc("scheduleid", false); +} + +template<> +void +UnbindColumns(RowState & rs, ScheduleCandidatePtr const & s) +{ + rs.fields[0] >> s->What; + rs.fields[1] >> s->ServiceId; + rs.fields[2] >> s->EventId; + rs.fields[3] >> s->TransportStreamId; + rs.fields[4] >> s->StartTime; + rs.fields[5] >> s->StopTime; + rs.fields[6] >> s->Priority; + rs.fields[7] >> s->ScheduleId; +} + +template<> +void +CreateColumns<P2PVR::SchedulePtr>(const ColumnCreator & cc) +{ + cc("scheduleid", true); + cc("serviceid", false); + cc("eventid", false); + cc("title", false); + cc("search", false); + cc("priority", false); + cc("early", false); + cc("late", false); + cc("repeats", false); +} + +template<> +void +UnbindColumns(RowState & rs, P2PVR::SchedulePtr const & s) +{ + rs.fields[0] >> s->ScheduleId; + rs.fields[1] >> s->ServiceId; + rs.fields[2] >> s->EventId; + rs.fields[3] >> s->Title; + rs.fields[4] >> s->Search; + rs.fields[5] >> s->Priority; + rs.fields[6] >> s->Early; + rs.fields[7] >> s->Late; + rs.fields[8] >> s->Repeats; +} + +template<> +void +CreateColumns<RecordPtr>(const ColumnCreator & cc) +{ + cc("serviceid", true); + cc("eventid", true); + cc("recordstatus", false); + cc("scheduleid", false); +} + +template<> +void +BindColumns(RowState & rs, RecordPtr const & s) +{ + rs.fields[0] << s->ServiceId; + rs.fields[1] << s->EventId; + rs.fields[2] << (int)s->RecordStatus; + rs.fields[3] << s->ScheduleId; +} + +Showing::Showing(unsigned int s, unsigned int e, unsigned int t, unsigned int sc, datetime start, datetime stop, int p, const Episode * ep) : + episode(ep), + serviceId(s), + eventId(e), + priority(p), + scheduleId(sc), + transportStreamId(t), + startTime(start), + stopTime(stop), + period(start, stop) +{ +} + +Episode::Episode(const std::string & w) : + priority(0), + what(w) +{ +} + +EpisodeGroup::EpisodeGroup() : + tuners(1), + sumTimeToStart(0), + score(0) +{ +} + +bool +EpisodeGroup::IsShowingListValid(const Showings & showings) const +{ + struct TransOffset { + unsigned int trans; + char offset; + }; + typedef std::multimap<datetime, TransOffset> Periods; + typedef std::map<int, unsigned char> Usage; + + Periods periods; + Usage usage; + + BOOST_FOREACH(const auto & s, showings) { + if (s) { + periods.insert(Periods::value_type(s->startTime, {s->transportStreamId, 1})); + periods.insert(Periods::value_type(s->stopTime, {s->transportStreamId, -1})); + } + } + bool result = true; + BOOST_FOREACH(const auto & p, periods) { + auto & u = usage[p.second.trans]; + u += p.second.offset; + if (std::count_if(usage.begin(), usage.end(), [](const Usage::value_type & uv) { return uv.second > 0;}) > tuners) { + result = false; + break; + } + } + periods.clear(); + usage.clear(); + return result; +} + +template<typename T> +inline +bool NotNull(const T & p) +{ + return p != NULL; +} + +class SumTimeToStart { + public: + SumTimeToStart(time_t & t) : + total(t), + now(boost::posix_time::second_clock::universal_time()) + { + total = 0; + } + inline void operator()(const ShowingPtr & s) const + { + if (s) { + total += std::min<time_t>((now - s->startTime).seconds(), 0); + } + } + public: + time_t & total; + datetime now; +}; + +const Showings & +EpisodeGroup::Solve() +{ + SelectShowings(); + return selected; +} + +void +EpisodeGroup::Suggest(const Showings & showings) +{ + unsigned int c = 0; + std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; }); + if (c >= score) { + time_t stt; + std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt)); + if (stt < sumTimeToStart || (stt == sumTimeToStart && c > score)) { + if (IsShowingListValid(showings)) { + selected = showings; + score = c; + sumTimeToStart = stt; + } + } + } +} + +EpisodeGroup::SuggestionResult +EpisodeGroup::SuggestWithFeedback(const Showings & showings) +{ + if (IsShowingListValid(showings)) { + unsigned int c = 0; + std::for_each(showings.begin(), showings.end(), [&c](const ShowingPtr & s) { if (s) c+= s->episode->priority; }); + if (c >= score) { + time_t stt; + std::for_each(showings.begin(), showings.end(), SumTimeToStart(stt)); + if (stt < sumTimeToStart) { + selected = showings; + score = c; + sumTimeToStart = stt; + return SuggestionValidAndAccepted; + } + } + return SuggestionValid; + } + else { + return SuggestionInvalid; + } +} + +void +Schedules::GetEpisodeIntersects(Episodes & all, Episodes & grouped) +{ + for (Episodes::iterator aei = all.begin(); aei != all.end(); aei++) { + const auto & ae = *aei; + BOOST_FOREACH(const auto & ge, grouped) { + BOOST_FOREACH(const auto & gs, ge->showings) { + BOOST_FOREACH(const auto & as, ae->showings) { + if (gs->period.intersects(as->period)) { + Logger()->messagebf(LOG_DEBUG, " added %s", ae->what); + grouped.push_back(ae); + all.erase(aei); + GetEpisodeIntersects(all, grouped); + return; + } + } + } + } + } +} + +void +Schedules::DoReschedule(const Ice::Current & ice) +{ + auto ic = ice.adapter->getCommunicator(); + auto devs = P2PVR::DevicesPrx::checkedCast(ice.adapter->createProxy(ic->stringToIdentity("GlobalDevices"))); + unsigned int tunerCount = devs->TunerCount(); + + // Load list from database + ScheduleCandidates episodes; + SqlContainerCreator<ScheduleCandidates, ScheduleCandidate, ScheduleCandidatePtr> cct(episodes); + cct.populate(Select(Schedules_GetCandidates).second); + + Episodes scheduleList; + Showings allShowings; + EpisodePtr cur; + int minPriority = 0; + BOOST_FOREACH(const auto & c, episodes) { + if (!cur || cur->what != c->What) { + cur = new Episode(c->What); + scheduleList.push_back(cur); + } + ShowingPtr s = new Showing(c->ServiceId, c->EventId, c->TransportStreamId, c->ScheduleId, + c->StartTime, c->StopTime, c->Priority, cur.get()); + minPriority = std::min(minPriority, s->priority); + cur->showings.push_back(s); + allShowings.push_back(s); + } + Logger()->messagebf(LOG_DEBUG, "%d episodes created, %s showings", scheduleList.size(), allShowings.size()); + BOOST_FOREACH(const auto & e, scheduleList) { + Logger()->messagebf(LOG_DEBUG, " %s", e->what); + BOOST_FOREACH(const auto & s, e->showings) { + s->priority += 1 - minPriority; + e->priority += s->priority; + } + e->priority /= e->showings.size(); + } + + Records records; + // Solve + while (!scheduleList.empty()) { + auto work = scheduleList.begin(); + Logger()->messagebf(LOG_DEBUG, "start %s", (*work)->what); + Episodes group; + group.push_back(*work); + scheduleList.erase(work); + GetEpisodeIntersects(scheduleList, group); + std::sort(group.begin(), group.end(), [](const EpisodePtr & a, const EpisodePtr & b) { + if (a->priority > b->priority) return true; + if (a->priority < b->priority) return false; + return a->what < b->what; + }); + + Logger()->messagebf(LOG_DEBUG, "group created with %d episodes", group.size()); + double total = 1; + // Measure and add the optional to not record + BOOST_FOREACH(const auto & e, group) { + Logger()->messagebf(LOG_DEBUG, " %d * %d:%s", e->showings.size(), e->priority, e->what); + e->showings.push_back(NULL); + total *= e->showings.size(); + } + Logger()->messagebf(LOG_DEBUG, "group complexity of %d options", total); + + EpisodeGroupPtr sched = EpisodeGroupPtr(EpisodeGroupLoader::createNew(SchedulerAlgorithm, group)); + sched->tuners = tunerCount; + std::set<ShowingPtr> selected; + BOOST_FOREACH(const auto & s, sched->Solve()) { + if (s) selected.insert(s); + } + + BOOST_FOREACH(const auto & c, group) { + Logger()->messagebf(LOG_DEBUG, "Episode %s, %d options", c->what, c->showings.size()); + BOOST_FOREACH(const auto & i, c->showings) { + if (selected.find(i) != selected.end()) { + Logger()->messagebf(LOG_DEBUG, " %s - %s (%d) <-", i->startTime, i->stopTime, i->transportStreamId); + } + else if (i) { + Logger()->messagebf(LOG_DEBUG, " %s - %s (%d)", i->startTime, i->stopTime, i->transportStreamId); + } + } + } + Logger()->message(LOG_DEBUG, "----------"); + BOOST_FOREACH(const auto & c, group) { + bool found = false; + BOOST_FOREACH(const auto & i, c->showings) { + if (i && selected.find(i) != selected.end()) { + found = true; + break; + } + } + BOOST_FOREACH(const auto & i, c->showings) { + if (i) { + records.push_back(RecordPtr(new Record(i->serviceId, i->eventId, + found ? + selected.find(i) != selected.end() ? Record_WillRecordThisShowing : Record_WillRecordOtherShowing : + Record_CannotRecordAnyShowing, i->scheduleId))); + } + } + } + } + + TxHelper tx(this); + SqlMergeTask mergeRecords("postgres", "record"); + CreateColumns<RecordPtr>(boost::bind(&DatabaseClient::SqlMergeColumnsInserter, &mergeRecords, _1, _2)); + mergeRecords.sources.insert(new ContainerIterator<Records>(&records)); + mergeRecords.loadComplete(this); + mergeRecords.execute(NULL); + tx.Commit(); + + auto recorder = P2PVR::RecorderPrx::checkedCast(ice.adapter->createProxy(ice.adapter->getCommunicator()->stringToIdentity("Recorder"))); + recorder->RefreshSchedules(); +} + +void +Schedules::DeleteSchedule(int id, const Ice::Current & ice) +{ + TxHelper tx(this); + Modify(Schedules_delete, id).second->execute(); + DoReschedule(ice); +} + +P2PVR::ScheduleList +Schedules::GetSchedules(const Ice::Current &) +{ + P2PVR::ScheduleList schedules; + SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules); + cct.populate(Select(Schedules_selectAll).second); + return schedules; +} + +P2PVR::SchedulePtr +Schedules::GetSchedule(int id, const Ice::Current &) +{ + P2PVR::ScheduleList schedules; + SqlContainerCreator<P2PVR::ScheduleList, P2PVR::Schedule> cct(schedules); + cct.populate(Select(Schedules_selectById, id).second); + if (schedules.empty()) throw P2PVR::NotFound(); + return schedules.front(); +} + +P2PVR::ScheduledToRecordList +Schedules::GetScheduledToRecord(const Ice::Current &) +{ + P2PVR::ScheduledToRecordList scheduled; + SqlContainerCreator<P2PVR::ScheduledToRecordList, P2PVR::ScheduledToRecord> cct(scheduled); + cct.populate(Select(Schedules_scheduledToRecord).second); + return scheduled; +} + +int +Schedules::UpdateSchedule(const P2PVR::SchedulePtr & s, const Ice::Current & ice) +{ + TxHelper tx(this); + if (s->ScheduleId == 0) { + Modify(Schedules_insert, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats).second->execute(); + s->ScheduleId = SelectScalar<int>(Schedules_insertNewId); + } + else { + Modify(Schedules_update, s->ServiceId, s->EventId, s->Title, s->Search, s->Priority, s->Early, s->Late, s->Repeats, s->ScheduleId).second->execute(); + } + DoReschedule(ice); + return s->ScheduleId; +} + +INSTANTIATESTORE(std::string, EpisodeGroupLoader); + |