diff options
Diffstat (limited to 'netfs/fuse/fuseFiles.cpp')
| -rw-r--r-- | netfs/fuse/fuseFiles.cpp | 513 |
1 files changed, 303 insertions, 210 deletions
diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp index b8eb014..fd313e5 100644 --- a/netfs/fuse/fuseFiles.cpp +++ b/netfs/fuse/fuseFiles.cpp @@ -1,261 +1,354 @@ -#include <string.h> -#include "fuseApp.impl.h" #include "fuseFiles.h" +#include "fuseApp.impl.h" // IWYU pragma: keep - getProxy definition #include "lockHelpers.h" +#include <Ice/BuiltinSequences.h> +#include <algorithm> +#include <cstring> #include <entCache.h> +#include <future> +#include <memory> #include <mutex> +#include <numeric.h> +#include <numeric> +#include <ranges> +#include <span> +#include <utility> +#include <vector> namespace NetFS { -FuseApp::OpenFile::WriteState::WriteState() : - future(promise.get_future().share()) -{ -} - -FuseApp::OpenFile::OpenFile(FilePrxPtr r, const std::string & p, int f) : - remote(r), - path(p), - flags(f) -{ -} + constexpr size_t MESSAGE_SIZE_HEADROOM = 1024; -template<> -std::map<int, FuseApp::OpenFilePtr> & -FuseApp::getMap<FuseApp::OpenFilePtr>() -{ - return openFiles; -} + FuseApp::OpenFile::WriteState::WriteState() : future(promise.get_future().share()) { } -void -FuseApp::OpenFile::wait() const -{ - auto cbg = [this](){ - SharedLock(_lock); - return bg; - }(); - for (const auto & w : cbg) { - w.second->future.wait(); + FuseApp::OpenFile::OpenFile(FilePrxPtr remotePrx, std::string remotePath, int openFlags, size_t messageMaxSize) : + remote(std::move(remotePrx)), path(std::move(remotePath)), flags(openFlags), + bodyMaxSize(messageMaxSize - MESSAGE_SIZE_HEADROOM) + { } -} -void -FuseApp::OpenFile::flush() -{ - auto first = [this]() { - SharedLock(_lock); - return bg.empty() ? nullptr : bg.begin()->second; - }; - while (auto w = first()) { - // background operations are void, so no need to actually get the return value - w->future.wait(); + template<> + std::map<FuseApp::FuseHandleTypeId, FuseApp::OpenFilePtr> & + FuseApp::getMap<FuseApp::OpenFilePtr>() + { + return openFiles; } -} -FuseApp::OpenFile::BGs::interval_type -FuseApp::OpenFile::range(off_t o, size_t s) -{ - return OpenFile::BGs::interval_type::right_open(o, o + s); -} - -int -FuseApp::open(const char * p, struct fuse_file_info * fi) -{ - try { - auto remote = volume->open(reqEnv(), p, fi->flags); - setProxy<OpenFilePtr>(fi->fh, remote, p, fi->flags); - return 0; - } - catch (SystemError & e) { - return -e.syserrno; + void + FuseApp::OpenFile::wait() const + { + auto cbg = [this]() { + SharedLock(mutex); + return bg; + }(); + for (const auto & operation : cbg) { + operation.second->future.get(); + } } -} -int -FuseApp::create(const char * p, mode_t m, struct fuse_file_info * fi) -{ - try { - auto remote = volume->create(reqEnv(), p, fi->flags, m); - setProxy<OpenFilePtr>(fi->fh, remote, p, fi->flags); - return 0; + void + FuseApp::OpenFile::flush() + { + auto getFirstBackgroundOp = [this]() { + SharedLock(mutex); + return bg.empty() ? nullptr : bg.begin()->second; + }; + while (auto operation = getFirstBackgroundOp()) { + operation->future.get(); + } } - catch (SystemError & e) { - return -e.syserrno; + + FuseApp::OpenFile::BGs::interval_type + FuseApp::OpenFile::range(off_t offset, size_t size) + { + return OpenFile::BGs::interval_type::right_open(safe {offset}, safe {offset} + size); } -} -int -FuseApp::release(const char *, struct fuse_file_info * fi) -{ - try { - auto of = getProxy<OpenFilePtr>(fi->fh); - auto remote = of->remote; + int + FuseApp::open(const char * path, struct fuse_file_info * fileInfo) + { try { - of->flush(); + auto remote = volume->open(reqEnv(), path, fileInfo->flags); + setProxy<OpenFilePtr>(fileInfo->fh, remote, path, fileInfo->flags, combinedSettings.messageSizeMax); + return 0; } catch (SystemError & e) { + return -e.syserrno; } - remote->close(); - clearProxy<OpenFilePtr>(fi->fh); - return 0; - } - catch (SystemError & e) { - clearProxy<OpenFilePtr>(fi->fh); - return -e.syserrno; } -} -int -FuseApp::flush(const char *, struct fuse_file_info * fi) -{ - try { - getProxy<OpenFilePtr>(fi->fh)->flush(); - return 0; + int + FuseApp::create(const char * path, mode_t mode, struct fuse_file_info * fileInfo) + { + try { + auto remote = volume->create(reqEnv(), path, fileInfo->flags, safe {mode}); + setProxy<OpenFilePtr>(fileInfo->fh, remote, path, fileInfo->flags, combinedSettings.messageSizeMax); + return 0; + } + catch (SystemError & e) { + return -e.syserrno; + } } - catch (SystemError & e) { - return -e.syserrno; + + int + FuseApp::release(const char *, struct fuse_file_info * fileInfo) + { + try { + auto openFile = getProxy<OpenFilePtr>(fileInfo->fh); + auto remote = openFile->remote; + try { + openFile->flush(); + clearProxy<OpenFilePtr>(fileInfo->fh); + remote->close(); + } + catch (SystemError & e) { + clearProxy<OpenFilePtr>(fileInfo->fh); + remote->close(); + throw; + } + return 0; + } + catch (SystemError & e) { + clearProxy<OpenFilePtr>(fileInfo->fh); + return -e.syserrno; + } } -} -template<typename Rtn, typename F> -inline -Rtn -FuseApp::waitOnWriteRangeAndThen(size_t s, off_t o, const OpenFilePtr & of, const F & f) -{ - const auto key = OpenFile::range(o, s); - while (true) { - std::unique_lock<decltype(of->_lock)> _l(of->_lock); - // Acquire operations to wait for - auto R = of->bg.equal_range(key); - if (R.first == R.second) { - // Perform operation - return f(key); + int + FuseApp::flush(const char *, struct fuse_file_info * fileInfo) + { + try { + getProxy<OpenFilePtr>(fileInfo->fh)->flush(); + return 0; } - else { + catch (SystemError & e) { + return -e.syserrno; + } + } + + template<typename Callback> + inline auto + FuseApp::waitOnWriteRangeAndThen(size_t size, off_t offset, const OpenFilePtr & openFile, const Callback & callback) + { + const auto key = OpenFile::range(offset, size); + while (true) { + std::unique_lock lock(openFile->mutex); + // Acquire operations to wait for + const auto pendingRange = openFile->bg.equal_range(key); + if (pendingRange.first == pendingRange.second) { + // Perform operation + return callback(key); + } + const auto overlap = [pendingRange]() { + std::vector<std::shared_future<void>> out; + out.reserve(safe<ptrdiff_t> {std::distance(pendingRange.first, pendingRange.second)}); + std::transform(pendingRange.first, pendingRange.second, std::back_inserter(out), [](auto && operation) { + return operation.second->future; + }); + return out; + }(); // Wait for them whilst unlocked - _l.release()->unlock(); - std::vector<std::shared_ptr<OpenFile::WriteState>> overlap; - overlap.reserve(std::distance(R.first, R.second)); - for (auto i = R.first; i != R.second; i++) { - overlap.push_back(i->second); + lock.release()->unlock(); + try { + std::for_each(overlap.begin(), overlap.end(), [](auto && operationFuture) { + operationFuture.get(); + }); + } + catch (const SystemError &) { + throw; } - for (const auto & r : overlap) { - r->future.wait(); + catch (...) { + throw SystemError {ECOMM}; } - // Cause this thread to yield so the callback can acquire _lock + // Cause this thread to yield so the callback can lock mutex usleep(0); } } -} -int -FuseApp::read(const char *, char * buf, size_t s, off_t o, struct fuse_file_info * fi) -{ - try { - auto cpy = [buf](const auto && data) { - memcpy(buf, &data.front(), data.size()); - return data.size(); - }; - auto of = getProxy<OpenFilePtr>(fi->fh); - auto remote = of->remote; - if (fcr->Async) { - auto p = waitOnWriteRangeAndThen<std::future<Buffer>>(s, o, of, [o, s, &remote](const auto &){ - return remote->readAsync(o, s); - }); - return cpy(p.get()); + int + FuseApp::read(const char *, char * buf, size_t size, off_t offset, struct fuse_file_info * fileInfo) + { + try { + const std::span out(buf, size); + using BackgroundOps = std::vector<std::promise<int>>; + const auto cpy = [out](off_t blockOffset, const auto && data) -> int { + std::ranges::copy(data, out.begin() + blockOffset); + return safe {data.size()}; + }; + const auto collateTotal = [](auto && ops) { + return std::accumulate(ops.begin(), ops.end(), 0, [](auto && total, auto & operation) { + return total += operation.get_future().get(); + }); + }; + auto openFile = getProxy<OpenFilePtr>(fileInfo->fh); + auto remote = openFile->remote; + if (fcr->Async) { + const auto blocks = out | std::views::chunk(openFile->bodyMaxSize); + BackgroundOps ops(blocks.size()); + std::ranges::for_each( + blocks, [offset, &openFile, &remote, &cpy, opIter = ops.begin(), out](auto && block) mutable { + waitOnWriteRangeAndThen(block.size(), offset, openFile, + [thisOp = opIter++, offset, &remote, &cpy, block, out](const auto &) { + const auto outPosition = block.begin() - out.begin(); + const auto position = offset + outPosition; + return remote->readAsync( + position, safe(block.size()), + [cpy, thisOp, outPosition](auto && resultBuf) { + thisOp->set_value( + cpy(outPosition, std::forward<Ice::ByteSeq>(resultBuf))); + }, + [thisOp](auto error) { + thisOp->set_exception(std::move(error)); + }); + }); + }); + return collateTotal(ops); + } + if (openFile->bodyMaxSize < size) { + const auto blocks = out | std::views::chunk(openFile->bodyMaxSize); + BackgroundOps ops(blocks.size()); + std::ranges::for_each( + blocks, [offset, &remote, out, &cpy, opIter = ops.begin()](auto && block) mutable { + auto thisOp = opIter++; + const auto outPosition = block.begin() - out.begin(); + const auto position = offset + outPosition; + remote->readAsync( + position, safe(block.size()), + [cpy, thisOp, outPosition](auto && resultBuf) { + thisOp->set_value(cpy(outPosition, std::forward<Ice::ByteSeq>(resultBuf))); + }, + [thisOp](auto error) { + thisOp->set_exception(std::move(error)); + }); + }); + return collateTotal(ops); + } + return cpy(0, remote->read(offset, safe {size})); } - else { - return cpy(remote->read(o, s)); + catch (SystemError & e) { + return -e.syserrno; } } - catch (SystemError & e) { - return -e.syserrno; - } -} -int -FuseApp::write(const char *, const char * buf, size_t s, off_t o, struct fuse_file_info * fi) -{ - try { - auto of = getProxy<OpenFilePtr>(fi->fh); - auto remote = of->remote; - if (fcr->Async) { - waitOnWriteRangeAndThen<void>(s, o, of, [o, s, buf, &of, &remote](const auto & key){ - auto p = std::make_shared<OpenFile::WriteState>(); - remote->writeAsync(o, s, Buffer(buf, buf + s), [p,of,key]() { - p->promise.set_value(); - ScopeLock(of->_lock) { - of->bg.erase(key); - } - }, [p,of,key](auto e) { - p->promise.set_exception(e); - ScopeLock(of->_lock) { - of->bg.erase(key); - } + int + FuseApp::write(const char *, const char * buf, size_t size, off_t offset, struct fuse_file_info * fileInfo) + { + static auto toBuffer = [](auto block) { + return std::make_pair(std::to_address(block.begin()), std::to_address(block.end())); + }; + try { + auto openFile = getProxy<OpenFilePtr>(fileInfo->fh); + auto remote = openFile->remote; + const std::span bytes {reinterpret_cast<const ::Ice::Byte *>(buf), size}; + if (fcr->Async) { + const auto blocks = bytes | std::views::chunk(openFile->bodyMaxSize); + std::ranges::for_each(blocks, [&remote, bytes, offset, &openFile](auto && block) { + const auto position = offset + (block.begin() - bytes.begin()); + waitOnWriteRangeAndThen( + block.size(), position, openFile, [position, block, &openFile, remote](const auto & key) { + auto pendingWrite = std::make_shared<OpenFile::WriteState>(); + openFile->bg.insert({key, pendingWrite}); + remote->writeAsync( + position, safe(block.size()), toBuffer(block), + [pendingWrite, openFile, key]() { + pendingWrite->promise.set_value(); + ScopeLock(openFile->mutex) { + openFile->bg.erase(key); + } + }, + [pendingWrite, openFile](auto error) { + pendingWrite->promise.set_exception(std::move(error)); + }); + }); }); - of->bg.insert({key, p}); - }); + } + else if (openFile->bodyMaxSize < size) { + const auto blocks = bytes | std::views::chunk(openFile->bodyMaxSize); + std::vector<std::future<void>> ops; + ops.reserve(blocks.size()); + std::ranges::transform(blocks, std::back_inserter(ops), [&remote, bytes, offset](auto && block) { + const auto position = offset + (block.begin() - bytes.begin()); + return remote->writeAsync(position, safe(block.size()), toBuffer(block)); + }); + std::ranges::for_each(ops, &std::future<void>::get); + } + else { + remote->write(offset, safe {size}, toBuffer(bytes)); + } + return safe {size}; } - else { - remote->write(o, s, Buffer(buf, buf + s)); + catch (SystemError & e) { + return -e.syserrno; } - return s; } - catch (SystemError & e) { - return -e.syserrno; - } -} -int -FuseApp::truncate(const char * p, off_t o) -{ - try { - volume->truncate(reqEnv(), p, o); - return 0; - } - catch (SystemError & e) { - return -e.syserrno; + ssize_t + FuseApp::copy_file_range(const char *, struct fuse_file_info * fileInfoIn, off_t offsetIn, const char *, + struct fuse_file_info * fileInfoOut, off_t offsetOut, size_t size, int flags) + { + try { + auto openFileIn = getProxy<OpenFilePtr>(fileInfoIn->fh); + auto openFileOut = getProxy<OpenFilePtr>(fileInfoOut->fh); + openFileIn->wait(); + openFileOut->wait(); + return openFileIn->remote->copyrange(openFileOut->remote, offsetIn, offsetOut, safe {size}, flags); + } + catch (SystemError & e) { + return -e.syserrno; + } } -} -int -FuseApp::ftruncate(const char *, off_t o, fuse_file_info * fi) -{ - try { - auto of = getProxy<OpenFilePtr>(fi->fh); - of->wait(); - auto remote = of->remote; - remote->ftruncate(reqEnv(), o); - return 0; - } - catch (SystemError & e) { - return -e.syserrno; + int + FuseApp::truncate(const char * path, off_t offset, fuse_file_info * fileInfo) + { + try { + if (fileInfo) { + auto openFile = getProxy<OpenFilePtr>(fileInfo->fh); + openFile->wait(); + openFile->remote->ftruncate(offset); + } + else { + volume->truncate(reqEnv(), path, offset); + } + return 0; + } + catch (SystemError & e) { + return -e.syserrno; + } } -} -int -FuseApp::fgetattr(const char *, struct stat * s, fuse_file_info * fi) -{ - try { - auto of = getProxy<OpenFilePtr>(fi->fh); - of->wait(); - auto remote = of->remote; - *s = converter.convert(remote->fgetattr(reqEnv())); - return 0; - } - catch (SystemError & e) { - return -e.syserrno; + int + FuseApp::getattr(const char * path, struct stat * stat, fuse_file_info * fileInfo) + { + try { + if (fileInfo) { + auto openFile = getProxy<OpenFilePtr>(fileInfo->fh); + openFile->wait(); + *stat = converter.convert(openFile->remote->fgetattr()); + } + else { + if (auto cacehedStat = statCache.get(std::filesystem::hash_value(path))) { + *stat = *cacehedStat; + } + else { + *stat = converter.convert(volume->getattr(reqEnv(), path)); + } + } + return stat->st_mode ? 0 : -ENOENT; + } + catch (SystemError & e) { + return -e.syserrno; + } } -} -int -FuseApp::unlink(const char * p) -{ - try { - volume->unlink(reqEnv(), p); - return 0; - } - catch (SystemError & e) { - return -e.syserrno; + int + FuseApp::unlink(const char * path) + { + try { + volume->unlink(reqEnv(), path); + return 0; + } + catch (SystemError & e) { + return -e.syserrno; + } } } -} - |
