diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-08-01 21:16:51 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-08-02 19:54:54 +0100 |
commit | 5a3b24d0f16b9d4c80a94795341a7d393314a8e7 (patch) | |
tree | 80a2efb80d6dd14b7a8f121d5124a71f5d919a37 /p2pvr/daemon | |
parent | Mocks should accept and pass through options for DryIce (diff) | |
download | p2pvr-5a3b24d0f16b9d4c80a94795341a7d393314a8e7.tar.bz2 p2pvr-5a3b24d0f16b9d4c80a94795341a7d393314a8e7.tar.xz p2pvr-5a3b24d0f16b9d4c80a94795341a7d393314a8e7.zip |
Store videos in the database
Diffstat (limited to 'p2pvr/daemon')
-rw-r--r-- | p2pvr/daemon/daemon.cpp | 2 | ||||
-rw-r--r-- | p2pvr/daemon/recorder.cpp | 32 | ||||
-rw-r--r-- | p2pvr/daemon/recorder.h | 13 | ||||
-rw-r--r-- | p2pvr/daemon/recordings.cpp | 2 | ||||
-rw-r--r-- | p2pvr/daemon/sql/recordings/getAll.sql | 2 | ||||
-rw-r--r-- | p2pvr/daemon/sql/storage/getSize.sql | 3 | ||||
-rw-r--r-- | p2pvr/daemon/sql/storage/writeBlock.sql | 2 | ||||
-rw-r--r-- | p2pvr/daemon/storage.cpp | 115 | ||||
-rw-r--r-- | p2pvr/daemon/storage.h | 19 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/datasources/data.sql | 2 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/testMaint.cpp | 2 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/testRecordings.cpp | 5 | ||||
-rw-r--r-- | p2pvr/daemon/unittests/testStorage.cpp | 45 |
13 files changed, 89 insertions, 155 deletions
diff --git a/p2pvr/daemon/daemon.cpp b/p2pvr/daemon/daemon.cpp index 109086a..c19f5cf 100644 --- a/p2pvr/daemon/daemon.cpp +++ b/p2pvr/daemon/daemon.cpp @@ -42,7 +42,7 @@ class P2PvrDaemon : public IceTray::Service { auto maintenance = add<Maintenance>(ic, adapter, new MaintenanceI(db, adapter, timer), "Maintenance"); auto si = add<SI>(ic, adapter, new SII(db), "SI"); auto schedules = add<Schedules>(ic, adapter, new SchedulesI(db), "Schedules"); - auto storage = add<Storage>(ic, adapter, new StorageI(), "Storage"); + auto storage = add<Storage>(ic, adapter, new StorageI(db), "Storage"); auto recorder = add<Recorder>(ic, adapter, new RecorderI(adapter, timer), "Recorder"); auto recordings = add<Recordings>(ic, adapter, new RecordingsI(db), "Recordings"); auto tmdb = add<TMDb::Proxy>(ic, adapter, new TMDb::Proxy(tmdbOpts->baseUrl, tmdbOpts->apikey), "TMDb"); diff --git a/p2pvr/daemon/recorder.cpp b/p2pvr/daemon/recorder.cpp index 28e89ee..c797d04 100644 --- a/p2pvr/daemon/recorder.cpp +++ b/p2pvr/daemon/recorder.cpp @@ -7,25 +7,11 @@ #include "serviceStreamer.h" #include "storage.h" #include "muxer.h" -#include <boost/uuid/uuid_generators.hpp> -#include <boost/uuid/uuid_io.hpp> #include <boost/lexical_cast.hpp> namespace po = boost::program_options; namespace P2PVR { -RecorderI::Options::Options() : - IceTray::Options("P2PVR Recorder options") -{ -} - -ICETRAY_OPTIONS(RecorderI::Options, - ("p2pvr.recorder.extension", po::value(&extension)->default_value("mp4"), - "File extension to save with (default: mp4)") - ("p2pvr.recorder.muxercommand", po::value(&muxerCommand)->default_value("/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -"), - "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -f mpegts -i - -f dvd -codec copy -')") -); - IceTray::Logging::LoggerPtr RecorderI::logger(LOGMANAGER()->getLogger<RecorderI>()); RecorderI::RecorderI(Ice::ObjectAdapterPtr a, IceUtil::TimerPtr t) : @@ -73,16 +59,8 @@ RecorderI::StartRecording(SchedulePtr schedule, ::DVBSI::ServicePtr service, Eve auto devices = DevicesPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("GlobalDevices"))); auto si = SIPrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("SI"))); - auto id = boost::lexical_cast<std::string>(boost::uuids::random_generator()()); - auto store = storage->OpenForWrite(id); - AdHoc::ScopeExit _store(NULL, NULL, [this,&store,&storage,&id]() { storage->Close(store); storage->Delete(id); }); - auto target = store; - RawDataClientPrx muxer; - if (!options->muxerCommand.empty()) { - muxer = RawDataClientPrx::checkedCast(adapter->addWithUUID(new Muxer(store, options->muxerCommand))); - target = muxer; - } - AdHoc::ScopeExit _muxer(NULL, NULL, [this,&muxer]() { if (muxer) adapter->remove(muxer->ice_getIdentity()); }); + auto recordingId = recordings->NewRecording(new Recording(0, schedule->ScheduleId, event->EventUid)); + auto target = storage->OpenForWrite(recordingId); auto ss = ServiceStreamerPtr(new ServiceStreamer(service->ServiceId, target, devices, si, adapter)); ss->Start(); @@ -90,8 +68,7 @@ RecorderI::StartRecording(SchedulePtr schedule, ::DVBSI::ServicePtr service, Eve event->Title, event->StartTime, event->StopTime, service->Name ? *service->Name : "<no name>", service->ServiceId); - recordings->NewRecording(new Recording(0, storage->ice_toString(), id, schedule->ScheduleId, event->EventUid)); - auto newCurrent = CurrentPtr(new Current({muxer, store, ss, schedule, service, event, IceUtil::TimerTaskPtr()})); + auto newCurrent = CurrentPtr(new Current({target, ss, schedule, service, event, IceUtil::TimerTaskPtr()})); currentRecordings.insert(newCurrent); auto stopIn = (*event->StopTime + *schedule->Late - boost::posix_time::second_clock::universal_time()).total_seconds(); @@ -106,9 +83,6 @@ RecorderI::StopRecording(CurrentPtr c) std::lock_guard<std::mutex> g(lock); logger->messagebf(LOG::DEBUG, "Stopping %s", c->event->Title); c->stream->Stop(); - if (c->muxer) { - adapter->remove(c->muxer->ice_getIdentity()); - } currentRecordings.erase(c); auto storage = StoragePrx::checkedCast(adapter->createProxy(adapter->getCommunicator()->stringToIdentity("Storage"))); storage->Close(c->store); diff --git a/p2pvr/daemon/recorder.h b/p2pvr/daemon/recorder.h index 6effb07..215732f 100644 --- a/p2pvr/daemon/recorder.h +++ b/p2pvr/daemon/recorder.h @@ -3,7 +3,6 @@ #include <IceUtil/Timer.h> #include <Ice/ObjectAdapter.h> -#include <options.h> #include <p2pvr.h> #include <mutex> #include "serviceStreamer.h" @@ -13,21 +12,10 @@ namespace P2PVR { class DLL_PUBLIC RecorderI : public Recorder { public: - class Options : public IceTray::Options { - public: - Options(); - - ICETRAY_OPTIONS_DECLARE; - - std::string extension; - std::string muxerCommand; - }; - typedef std::vector<IceUtil::TimerTaskPtr> Pendings; class Current { public: - RawDataClientPrx muxer; RawDataClientPrx store; ServiceStreamerPtr stream; SchedulePtr schedule; @@ -53,7 +41,6 @@ class DLL_PUBLIC RecorderI : public Recorder { Pendings pendingRecordings; std::mutex lock; - IceTray::OptionsResolver<Options> options; static IceTray::Logging::LoggerPtr logger; }; } diff --git a/p2pvr/daemon/recordings.cpp b/p2pvr/daemon/recordings.cpp index 6efa78c..0ab2e46 100644 --- a/p2pvr/daemon/recordings.cpp +++ b/p2pvr/daemon/recordings.cpp @@ -19,7 +19,7 @@ Ice::Int RecordingsI::NewRecording(const RecordingPtr & r, const Ice::Current &) { auto dbc = db->get(); - logger->messagebf(LOG::INFO, "%s: Creating new recording %s at %s", __PRETTY_FUNCTION__, r->Guid, r->StorageAddress); + logger->messagebf(LOG::INFO, "%s: Creating new recording for %d (EventUid %d)", __PRETTY_FUNCTION__, r->RecordingId, r->EventUid); DB::TransactionScope tx(dbc.get()); Slicer::SerializeAny<Slicer::SqlFetchIdInsertSerializer>(r, dbc.get(), "recordings"); logger->messagebf(LOG::INFO, "%s: Created recording Id: %d", __PRETTY_FUNCTION__, r->RecordingId); diff --git a/p2pvr/daemon/sql/recordings/getAll.sql b/p2pvr/daemon/sql/recordings/getAll.sql index 3c3ff51..ffc9613 100644 --- a/p2pvr/daemon/sql/recordings/getAll.sql +++ b/p2pvr/daemon/sql/recordings/getAll.sql @@ -1,3 +1,3 @@ -SELECT recordingId, storageAddress, guid, scheduleId, eventUid +SELECT recordingId, scheduleId, eventUid FROM recordings r ORDER BY recordingId diff --git a/p2pvr/daemon/sql/storage/getSize.sql b/p2pvr/daemon/sql/storage/getSize.sql new file mode 100644 index 0000000..1427d02 --- /dev/null +++ b/p2pvr/daemon/sql/storage/getSize.sql @@ -0,0 +1,3 @@ +SELECT SUM(LENGTH(videodata)) +FROM recordedvideo +WHERE recordingid = ? diff --git a/p2pvr/daemon/sql/storage/writeBlock.sql b/p2pvr/daemon/sql/storage/writeBlock.sql new file mode 100644 index 0000000..1b787e7 --- /dev/null +++ b/p2pvr/daemon/sql/storage/writeBlock.sql @@ -0,0 +1,2 @@ +INSERT INTO recordedvideo(recordingid, videodata) +VALUES(?, ?) diff --git a/p2pvr/daemon/storage.cpp b/p2pvr/daemon/storage.cpp index 51c1924..844623f 100644 --- a/p2pvr/daemon/storage.cpp +++ b/p2pvr/daemon/storage.cpp @@ -1,11 +1,9 @@ #include "storage.h" -#include "fileSink.h" -#include "muxedFileSink.h" -#include <fcntl.h> +#include "muxer.h" #include <logger.h> -#include <boost/filesystem/operations.hpp> +#include <sql/storage/getSize.sql.h> +#include <sql/storage/writeBlock.sql.h> -namespace fs = boost::filesystem; namespace po = boost::program_options; namespace P2PVR { @@ -17,89 +15,68 @@ StorageI::Options::Options() : ICETRAY_OPTIONS(StorageI::Options, ("p2pvr.storage.muxercommand", po::value(&muxerCommand)->default_value("/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}"), "Command to perform TS->PS muxing (default: '/usr/bin/ffmpeg -i - -f mp4 -y ${TARGET}')") -("p2pvr.storage.root", po::value(&root)->default_value("recordings"), - "Root folder in which to store recordings") ); IceTray::Logging::LoggerPtr StorageI::logger(LOGMANAGER()->getLogger<StorageI>()); -RawDataClientPrx -StorageI::OpenForWrite(const std::string & id, const Ice::Current & ice) -{ - fs::create_directories(options->root); - fs::path path = options->root / id; - RawDataClient * target; - if (options->muxerCommand.empty()) { - auto fd = open(path.string().c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664); - if (fd < 0) { - logger->messagebf(LOG::ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, - path, errno, strerror(errno)); - throw StorageException(path.string(), strerror(errno)); +class RecordingSink : public RawDataClient { + public: + RecordingSink(AdHoc::ResourceHandle<DB::Connection> && d, Ice::Int recordingId) : + db(d), + ins(sql::storage::writeBlock.modify(db.get())) + { + ins->bindParamI(0, recordingId); } - target = new FileSink(fd); - } - else { - target = new MuxedFileSink(path, options->muxerCommand); - } - auto openFile = OpenFilePtr(new OpenFile(ice.adapter, target)); - openFiles.insert(openFile); - return *openFile; -} -void -StorageI::Close(const RawDataClientPrx & file, const Ice::Current &) + bool NewData(const Data & data, const Ice::Current &) override + { + if (!data.empty()) { + ins->bindParamBLOB(1, data); + ins->execute(); + return false; + } + return true; + } + + AdHoc::ResourceHandle<DB::Connection> db; + DB::ModifyCommandPtr ins; +}; + +StorageI::StorageI(IceTray::DatabasePoolPtr db) : + IceTray::AbstractDatabaseClient(db) { - openFiles.erase(std::find_if(openFiles.begin(), openFiles.end(), [&file](const OpenFilePtr & of) { return *of == file; })); } -void -StorageI::Delete(const std::string & id, const Ice::Current &) +RawDataClientPrx +StorageI::OpenForWrite(Ice::Int recordingId, const Ice::Current & ice) { - fs::path path = options->root / id; - logger->messagebf(LOG::INFO, "%s: Deleting %s", __PRETTY_FUNCTION__, path); - fs::remove(path); + auto target = RawDataClientPrx::uncheckedCast(ice.adapter->addWithUUID(new RecordingSink(db->get(), recordingId))); + if (!options->muxerCommand.empty()) { + auto storageId = target->ice_getIdentity(); + target = RawDataClientPrx::uncheckedCast(ice.adapter->addWithUUID(new Muxer(target, options->muxerCommand))); + muxerStorageLink.insert({ target->ice_getIdentity(), storageId }); + } + return target; } void -StorageI::Send(const std::string & id, const RawDataClientPrx & target, Ice::Long start, Ice::Long len, const Ice::Current &) +StorageI::Close(const RawDataClientPrx & file, const Ice::Current & ice) { - fs::path path = options->root / id; - auto fd = open(path.string().c_str(), O_RDONLY | O_LARGEFILE); - if (fd < 0) { - logger->messagebf(LOG::ERR, "%s: Failed to open file for reading at %s (%d:%s)", __PRETTY_FUNCTION__, - path, errno, strerror(errno)); - throw StorageException(path.string(), strerror(errno)); - } - lseek(fd, start, SEEK_SET); - auto end = std::min<off_t>(fs::file_size(path), start + len); - while (start < end) { - Data buf(16 * 1024); - auto r = read(fd, &buf.front(), std::min<size_t>(buf.size(), end - start)); - if (r < 0) { - logger->messagebf(LOG::ERR, "%s: Failed to read file %s (%d:%s)", __PRETTY_FUNCTION__, - path, errno, strerror(errno)); - close(fd); - throw StorageException(path.string(), strerror(errno)); - } - if (target->NewData(buf)) { - close(fd); - return; - } - start += r; + ice.adapter->remove(file->ice_getIdentity()); + auto storageIdItr = muxerStorageLink.find(file->ice_getIdentity()); + if (storageIdItr != muxerStorageLink.end()) { + ice.adapter->remove(storageIdItr->second); + muxerStorageLink.erase(storageIdItr); } - close(fd); } Ice::Long -StorageI::FileSize(const std::string & id, const Ice::Current &) +StorageI::GetSize(Ice::Int recordingId, const Ice::Current &) { - fs::path path = options->root / id; - try { - return fs::file_size(path); - } - catch (...) { - throw StorageException(path.string(), "Couldn't get file size"); - } + // Handles video not existing + auto size = fetch<IceUtil::Optional<Ice::Long>>(sql::storage::getSize, recordingId); + if (size) return *size; + return 0; } } diff --git a/p2pvr/daemon/storage.h b/p2pvr/daemon/storage.h index 4a2b389..48d3b57 100644 --- a/p2pvr/daemon/storage.h +++ b/p2pvr/daemon/storage.h @@ -9,9 +9,10 @@ #include "temporaryIceAdapterObject.h" #include <visibility.h> #include <logger.h> +#include <abstractDatabaseClient.h> namespace P2PVR { -class DLL_PUBLIC StorageI : public Storage { +class DLL_PUBLIC StorageI : public Storage, IceTray::AbstractDatabaseClient { public: class Options : public IceTray::Options { public: @@ -20,20 +21,16 @@ class DLL_PUBLIC StorageI : public Storage { ICETRAY_OPTIONS_DECLARE; std::string muxerCommand; - boost::filesystem::path root; }; - RawDataClientPrx OpenForWrite(const std::string &, const Ice::Current &) override; - void Close(const RawDataClientPrx & file, const Ice::Current &) override; - void Delete(const std::string &, const Ice::Current &) override; - Ice::Long FileSize(const std::string &, const Ice::Current &) override; - void Send(const std::string &, const RawDataClientPrx & target, Ice::Long start, Ice::Long len, const Ice::Current &) override; + StorageI(IceTray::DatabasePoolPtr db); + + RawDataClientPrx OpenForWrite(Ice::Int recordingId, const Ice::Current &) override; + void Close(const RawDataClientPrx &, const Ice::Current &) override; + Ice::Long GetSize(Ice::Int recordingId, const Ice::Current &) override; protected: - typedef TemporaryIceAdapterObject<RawDataClient> OpenFile; - typedef boost::shared_ptr<OpenFile> OpenFilePtr; - typedef std::set<OpenFilePtr> OpenFiles; - OpenFiles openFiles; + std::map<Ice::Identity, Ice::Identity> muxerStorageLink; IceTray::OptionsResolver<Options> options; static IceTray::Logging::LoggerPtr logger; }; diff --git a/p2pvr/daemon/unittests/datasources/data.sql b/p2pvr/daemon/unittests/datasources/data.sql index 5c91ecd..cc5198a 100644 --- a/p2pvr/daemon/unittests/datasources/data.sql +++ b/p2pvr/daemon/unittests/datasources/data.sql @@ -99,7 +99,7 @@ SELECT pg_catalog.setval('recorded_recordedid_seq', 1, false); -- Data for Name: recordings; Type: TABLE DATA; Schema: public; Owner: p2pvr -- -COPY recordings (recordingid, storageaddress, guid, scheduleid, eventuid) FROM '$SCRIPTDIR/recordings'; +COPY recordings (recordingid, scheduleid, eventuid) FROM '$SCRIPTDIR/recordings'; -- diff --git a/p2pvr/daemon/unittests/testMaint.cpp b/p2pvr/daemon/unittests/testMaint.cpp index 10e0293..026ec27 100644 --- a/p2pvr/daemon/unittests/testMaint.cpp +++ b/p2pvr/daemon/unittests/testMaint.cpp @@ -204,7 +204,7 @@ BOOST_AUTO_TEST_CASE( update_events ) irecorded->bindParamI(1, keyEvent1->EventUid); irecorded->execute(); auto irecordings = boost::shared_ptr<DB::ModifyCommand>( - db->newModifyCommand("INSERT INTO recordings(storageAddress, guid, scheduleId, eventUid) VALUES('', '', ?, ?)")); + db->newModifyCommand("INSERT INTO recordings(scheduleId, eventUid) VALUES(?, ?)")); irecordings->bindParamI(0, 1); irecordings->bindParamI(1, keyEvent2->EventUid); irecordings->execute(); diff --git a/p2pvr/daemon/unittests/testRecordings.cpp b/p2pvr/daemon/unittests/testRecordings.cpp index b12c984..b68a2a3 100644 --- a/p2pvr/daemon/unittests/testRecordings.cpp +++ b/p2pvr/daemon/unittests/testRecordings.cpp @@ -10,8 +10,6 @@ #include <recordings.h> #include <linux/dvb/frontend.h> #include <definedDirs.h> -#include <boost/uuid/uuid_generators.hpp> -#include <boost/uuid/uuid_io.hpp> #include <boost/lexical_cast.hpp> #include "mockDefs.h" @@ -33,8 +31,7 @@ BOOST_AUTO_TEST_CASE( recordings_addAndDelete ) BOOST_REQUIRE(event); BOOST_TEST_MESSAGE(event->Title); - auto guid = boost::lexical_cast<std::string>(boost::uuids::random_generator()()); - auto rec = P2PVR::RecordingPtr(new P2PVR::Recording(0, "", guid, 0, event->EventUid)); + auto rec = P2PVR::RecordingPtr(new P2PVR::Recording(0, 0, event->EventUid)); rec->RecordingId = recordings->NewRecording(rec); BOOST_REQUIRE_EQUAL(218, rec->RecordingId); recordings->GetRecordings(); diff --git a/p2pvr/daemon/unittests/testStorage.cpp b/p2pvr/daemon/unittests/testStorage.cpp index ebbddcc..5083848 100644 --- a/p2pvr/daemon/unittests/testStorage.cpp +++ b/p2pvr/daemon/unittests/testStorage.cpp @@ -19,14 +19,11 @@ using namespace P2PVR::Testing; const boost::filesystem::path storageRootDir = "/tmp/ut/p2pvr/recordings"; namespace P2PVR { namespace Testing { -class TestService : public PQ::Mock, public IceTray::DryIce { +class TestService : public StandardMockDatabase { public: TestService() : - PQ::Mock("user=postgres dbname=postgres", "postgres", { - rootDir.parent_path().parent_path() / "datasources" / "schema.sql" }), - IceTray::DryIce({ - R"C(--p2pvr.storage.muxercommand="")C", - std::string("--p2pvr.storage.root=" + storageRootDir.string()), + StandardMockDatabase({ + R"C(--p2pvr.storage.muxercommand="cat")C" }) { } @@ -38,39 +35,39 @@ BOOST_GLOBAL_FIXTURE( TestService ); BOOST_FIXTURE_TEST_SUITE( StCore, TestClient ); -BOOST_AUTO_TEST_CASE( st_openWriteClose ) +static +void +runTest(RecordingsPrx recordings, StoragePrx storage) { - boost::filesystem::remove_all(storageRootDir); - std::string id = "made-up-storage-id"; - + auto id = recordings->NewRecording(new Recording(0, 8, 2556)); auto rdc = storage->OpenForWrite(id); BOOST_REQUIRE(rdc); - P2PVR::Data data; + Data data; data.resize(1024); rdc->NewData(data); storage->Close(rdc); - auto stSize = storage->FileSize(id); - BOOST_REQUIRE_EQUAL(1024, stSize); - BOOST_REQUIRE_EQUAL(1024, boost::filesystem::file_size(storageRootDir / id)); + BOOST_REQUIRE_EQUAL(1024, storage->GetSize(id)); - storage->Delete(id); - BOOST_REQUIRE(!boost::filesystem::exists(storageRootDir / id)); + recordings->DeleteRecording(id); + BOOST_REQUIRE_EQUAL(0, storage->GetSize(id)); } -BOOST_AUTO_TEST_CASE( st_notuniqueid ) +BOOST_AUTO_TEST_CASE( st_openWriteCloseMuxWithCat ) { - boost::filesystem::remove_all(storageRootDir); - std::string id = "made-up-storage-id"; - - auto rdc = storage->OpenForWrite(id); - storage->Close(rdc); + IceTray::OptionsResolver<StorageI::Options> opts; + const_cast<std::string &>(opts->muxerCommand) = "/bin/cat"; + runTest(recordings, storage); +} - BOOST_REQUIRE_THROW(storage->OpenForWrite(id), P2PVR::StorageException); - storage->Delete(id); +BOOST_AUTO_TEST_CASE( st_openWriteCloseNoMuxer ) +{ + IceTray::OptionsResolver<StorageI::Options> opts; + const_cast<std::string &>(opts->muxerCommand).clear(); + runTest(recordings, storage); } BOOST_AUTO_TEST_SUITE_END() |