summaryrefslogtreecommitdiff
path: root/netfs/fuse/fuseFiles.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'netfs/fuse/fuseFiles.cpp')
-rw-r--r--netfs/fuse/fuseFiles.cpp513
1 files changed, 303 insertions, 210 deletions
diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp
index b8eb014..fd313e5 100644
--- a/netfs/fuse/fuseFiles.cpp
+++ b/netfs/fuse/fuseFiles.cpp
@@ -1,261 +1,354 @@
-#include <string.h>
-#include "fuseApp.impl.h"
#include "fuseFiles.h"
+#include "fuseApp.impl.h" // IWYU pragma: keep - getProxy definition
#include "lockHelpers.h"
+#include <Ice/BuiltinSequences.h>
+#include <algorithm>
+#include <cstring>
#include <entCache.h>
+#include <future>
+#include <memory>
#include <mutex>
+#include <numeric.h>
+#include <numeric>
+#include <ranges>
+#include <span>
+#include <utility>
+#include <vector>
namespace NetFS {
-FuseApp::OpenFile::WriteState::WriteState() :
- future(promise.get_future().share())
-{
-}
-
-FuseApp::OpenFile::OpenFile(FilePrxPtr r, const std::string & p, int f) :
- remote(r),
- path(p),
- flags(f)
-{
-}
+ constexpr size_t MESSAGE_SIZE_HEADROOM = 1024;
-template<>
-std::map<int, FuseApp::OpenFilePtr> &
-FuseApp::getMap<FuseApp::OpenFilePtr>()
-{
- return openFiles;
-}
+ FuseApp::OpenFile::WriteState::WriteState() : future(promise.get_future().share()) { }
-void
-FuseApp::OpenFile::wait() const
-{
- auto cbg = [this](){
- SharedLock(_lock);
- return bg;
- }();
- for (const auto & w : cbg) {
- w.second->future.wait();
+ FuseApp::OpenFile::OpenFile(FilePrxPtr remotePrx, std::string remotePath, int openFlags, size_t messageMaxSize) :
+ remote(std::move(remotePrx)), path(std::move(remotePath)), flags(openFlags),
+ bodyMaxSize(messageMaxSize - MESSAGE_SIZE_HEADROOM)
+ {
}
-}
-void
-FuseApp::OpenFile::flush()
-{
- auto first = [this]() {
- SharedLock(_lock);
- return bg.empty() ? nullptr : bg.begin()->second;
- };
- while (auto w = first()) {
- // background operations are void, so no need to actually get the return value
- w->future.wait();
+ template<>
+ std::map<FuseApp::FuseHandleTypeId, FuseApp::OpenFilePtr> &
+ FuseApp::getMap<FuseApp::OpenFilePtr>()
+ {
+ return openFiles;
}
-}
-FuseApp::OpenFile::BGs::interval_type
-FuseApp::OpenFile::range(off_t o, size_t s)
-{
- return OpenFile::BGs::interval_type::right_open(o, o + s);
-}
-
-int
-FuseApp::open(const char * p, struct fuse_file_info * fi)
-{
- try {
- auto remote = volume->open(reqEnv(), p, fi->flags);
- setProxy<OpenFilePtr>(fi->fh, remote, p, fi->flags);
- return 0;
- }
- catch (SystemError & e) {
- return -e.syserrno;
+ void
+ FuseApp::OpenFile::wait() const
+ {
+ auto cbg = [this]() {
+ SharedLock(mutex);
+ return bg;
+ }();
+ for (const auto & operation : cbg) {
+ operation.second->future.get();
+ }
}
-}
-int
-FuseApp::create(const char * p, mode_t m, struct fuse_file_info * fi)
-{
- try {
- auto remote = volume->create(reqEnv(), p, fi->flags, m);
- setProxy<OpenFilePtr>(fi->fh, remote, p, fi->flags);
- return 0;
+ void
+ FuseApp::OpenFile::flush()
+ {
+ auto getFirstBackgroundOp = [this]() {
+ SharedLock(mutex);
+ return bg.empty() ? nullptr : bg.begin()->second;
+ };
+ while (auto operation = getFirstBackgroundOp()) {
+ operation->future.get();
+ }
}
- catch (SystemError & e) {
- return -e.syserrno;
+
+ FuseApp::OpenFile::BGs::interval_type
+ FuseApp::OpenFile::range(off_t offset, size_t size)
+ {
+ return OpenFile::BGs::interval_type::right_open(safe {offset}, safe {offset} + size);
}
-}
-int
-FuseApp::release(const char *, struct fuse_file_info * fi)
-{
- try {
- auto of = getProxy<OpenFilePtr>(fi->fh);
- auto remote = of->remote;
+ int
+ FuseApp::open(const char * path, struct fuse_file_info * fileInfo)
+ {
try {
- of->flush();
+ auto remote = volume->open(reqEnv(), path, fileInfo->flags);
+ setProxy<OpenFilePtr>(fileInfo->fh, remote, path, fileInfo->flags, combinedSettings.messageSizeMax);
+ return 0;
}
catch (SystemError & e) {
+ return -e.syserrno;
}
- remote->close();
- clearProxy<OpenFilePtr>(fi->fh);
- return 0;
- }
- catch (SystemError & e) {
- clearProxy<OpenFilePtr>(fi->fh);
- return -e.syserrno;
}
-}
-int
-FuseApp::flush(const char *, struct fuse_file_info * fi)
-{
- try {
- getProxy<OpenFilePtr>(fi->fh)->flush();
- return 0;
+ int
+ FuseApp::create(const char * path, mode_t mode, struct fuse_file_info * fileInfo)
+ {
+ try {
+ auto remote = volume->create(reqEnv(), path, fileInfo->flags, safe {mode});
+ setProxy<OpenFilePtr>(fileInfo->fh, remote, path, fileInfo->flags, combinedSettings.messageSizeMax);
+ return 0;
+ }
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
}
- catch (SystemError & e) {
- return -e.syserrno;
+
+ int
+ FuseApp::release(const char *, struct fuse_file_info * fileInfo)
+ {
+ try {
+ auto openFile = getProxy<OpenFilePtr>(fileInfo->fh);
+ auto remote = openFile->remote;
+ try {
+ openFile->flush();
+ clearProxy<OpenFilePtr>(fileInfo->fh);
+ remote->close();
+ }
+ catch (SystemError & e) {
+ clearProxy<OpenFilePtr>(fileInfo->fh);
+ remote->close();
+ throw;
+ }
+ return 0;
+ }
+ catch (SystemError & e) {
+ clearProxy<OpenFilePtr>(fileInfo->fh);
+ return -e.syserrno;
+ }
}
-}
-template<typename Rtn, typename F>
-inline
-Rtn
-FuseApp::waitOnWriteRangeAndThen(size_t s, off_t o, const OpenFilePtr & of, const F & f)
-{
- const auto key = OpenFile::range(o, s);
- while (true) {
- std::unique_lock<decltype(of->_lock)> _l(of->_lock);
- // Acquire operations to wait for
- auto R = of->bg.equal_range(key);
- if (R.first == R.second) {
- // Perform operation
- return f(key);
+ int
+ FuseApp::flush(const char *, struct fuse_file_info * fileInfo)
+ {
+ try {
+ getProxy<OpenFilePtr>(fileInfo->fh)->flush();
+ return 0;
}
- else {
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
+ }
+
+ template<typename Callback>
+ inline auto
+ FuseApp::waitOnWriteRangeAndThen(size_t size, off_t offset, const OpenFilePtr & openFile, const Callback & callback)
+ {
+ const auto key = OpenFile::range(offset, size);
+ while (true) {
+ std::unique_lock lock(openFile->mutex);
+ // Acquire operations to wait for
+ const auto pendingRange = openFile->bg.equal_range(key);
+ if (pendingRange.first == pendingRange.second) {
+ // Perform operation
+ return callback(key);
+ }
+ const auto overlap = [pendingRange]() {
+ std::vector<std::shared_future<void>> out;
+ out.reserve(safe<ptrdiff_t> {std::distance(pendingRange.first, pendingRange.second)});
+ std::transform(pendingRange.first, pendingRange.second, std::back_inserter(out), [](auto && operation) {
+ return operation.second->future;
+ });
+ return out;
+ }();
// Wait for them whilst unlocked
- _l.release()->unlock();
- std::vector<std::shared_ptr<OpenFile::WriteState>> overlap;
- overlap.reserve(std::distance(R.first, R.second));
- for (auto i = R.first; i != R.second; i++) {
- overlap.push_back(i->second);
+ lock.release()->unlock();
+ try {
+ std::for_each(overlap.begin(), overlap.end(), [](auto && operationFuture) {
+ operationFuture.get();
+ });
+ }
+ catch (const SystemError &) {
+ throw;
}
- for (const auto & r : overlap) {
- r->future.wait();
+ catch (...) {
+ throw SystemError {ECOMM};
}
- // Cause this thread to yield so the callback can acquire _lock
+ // Cause this thread to yield so the callback can lock mutex
usleep(0);
}
}
-}
-int
-FuseApp::read(const char *, char * buf, size_t s, off_t o, struct fuse_file_info * fi)
-{
- try {
- auto cpy = [buf](const auto && data) {
- memcpy(buf, &data.front(), data.size());
- return data.size();
- };
- auto of = getProxy<OpenFilePtr>(fi->fh);
- auto remote = of->remote;
- if (fcr->Async) {
- auto p = waitOnWriteRangeAndThen<std::future<Buffer>>(s, o, of, [o, s, &remote](const auto &){
- return remote->readAsync(o, s);
- });
- return cpy(p.get());
+ int
+ FuseApp::read(const char *, char * buf, size_t size, off_t offset, struct fuse_file_info * fileInfo)
+ {
+ try {
+ const std::span out(buf, size);
+ using BackgroundOps = std::vector<std::promise<int>>;
+ const auto cpy = [out](off_t blockOffset, const auto && data) -> int {
+ std::ranges::copy(data, out.begin() + blockOffset);
+ return safe {data.size()};
+ };
+ const auto collateTotal = [](auto && ops) {
+ return std::accumulate(ops.begin(), ops.end(), 0, [](auto && total, auto & operation) {
+ return total += operation.get_future().get();
+ });
+ };
+ auto openFile = getProxy<OpenFilePtr>(fileInfo->fh);
+ auto remote = openFile->remote;
+ if (fcr->Async) {
+ const auto blocks = out | std::views::chunk(openFile->bodyMaxSize);
+ BackgroundOps ops(blocks.size());
+ std::ranges::for_each(
+ blocks, [offset, &openFile, &remote, &cpy, opIter = ops.begin(), out](auto && block) mutable {
+ waitOnWriteRangeAndThen(block.size(), offset, openFile,
+ [thisOp = opIter++, offset, &remote, &cpy, block, out](const auto &) {
+ const auto outPosition = block.begin() - out.begin();
+ const auto position = offset + outPosition;
+ return remote->readAsync(
+ position, safe(block.size()),
+ [cpy, thisOp, outPosition](auto && resultBuf) {
+ thisOp->set_value(
+ cpy(outPosition, std::forward<Ice::ByteSeq>(resultBuf)));
+ },
+ [thisOp](auto error) {
+ thisOp->set_exception(std::move(error));
+ });
+ });
+ });
+ return collateTotal(ops);
+ }
+ if (openFile->bodyMaxSize < size) {
+ const auto blocks = out | std::views::chunk(openFile->bodyMaxSize);
+ BackgroundOps ops(blocks.size());
+ std::ranges::for_each(
+ blocks, [offset, &remote, out, &cpy, opIter = ops.begin()](auto && block) mutable {
+ auto thisOp = opIter++;
+ const auto outPosition = block.begin() - out.begin();
+ const auto position = offset + outPosition;
+ remote->readAsync(
+ position, safe(block.size()),
+ [cpy, thisOp, outPosition](auto && resultBuf) {
+ thisOp->set_value(cpy(outPosition, std::forward<Ice::ByteSeq>(resultBuf)));
+ },
+ [thisOp](auto error) {
+ thisOp->set_exception(std::move(error));
+ });
+ });
+ return collateTotal(ops);
+ }
+ return cpy(0, remote->read(offset, safe {size}));
}
- else {
- return cpy(remote->read(o, s));
+ catch (SystemError & e) {
+ return -e.syserrno;
}
}
- catch (SystemError & e) {
- return -e.syserrno;
- }
-}
-int
-FuseApp::write(const char *, const char * buf, size_t s, off_t o, struct fuse_file_info * fi)
-{
- try {
- auto of = getProxy<OpenFilePtr>(fi->fh);
- auto remote = of->remote;
- if (fcr->Async) {
- waitOnWriteRangeAndThen<void>(s, o, of, [o, s, buf, &of, &remote](const auto & key){
- auto p = std::make_shared<OpenFile::WriteState>();
- remote->writeAsync(o, s, Buffer(buf, buf + s), [p,of,key]() {
- p->promise.set_value();
- ScopeLock(of->_lock) {
- of->bg.erase(key);
- }
- }, [p,of,key](auto e) {
- p->promise.set_exception(e);
- ScopeLock(of->_lock) {
- of->bg.erase(key);
- }
+ int
+ FuseApp::write(const char *, const char * buf, size_t size, off_t offset, struct fuse_file_info * fileInfo)
+ {
+ static auto toBuffer = [](auto block) {
+ return std::make_pair(std::to_address(block.begin()), std::to_address(block.end()));
+ };
+ try {
+ auto openFile = getProxy<OpenFilePtr>(fileInfo->fh);
+ auto remote = openFile->remote;
+ const std::span bytes {reinterpret_cast<const ::Ice::Byte *>(buf), size};
+ if (fcr->Async) {
+ const auto blocks = bytes | std::views::chunk(openFile->bodyMaxSize);
+ std::ranges::for_each(blocks, [&remote, bytes, offset, &openFile](auto && block) {
+ const auto position = offset + (block.begin() - bytes.begin());
+ waitOnWriteRangeAndThen(
+ block.size(), position, openFile, [position, block, &openFile, remote](const auto & key) {
+ auto pendingWrite = std::make_shared<OpenFile::WriteState>();
+ openFile->bg.insert({key, pendingWrite});
+ remote->writeAsync(
+ position, safe(block.size()), toBuffer(block),
+ [pendingWrite, openFile, key]() {
+ pendingWrite->promise.set_value();
+ ScopeLock(openFile->mutex) {
+ openFile->bg.erase(key);
+ }
+ },
+ [pendingWrite, openFile](auto error) {
+ pendingWrite->promise.set_exception(std::move(error));
+ });
+ });
});
- of->bg.insert({key, p});
- });
+ }
+ else if (openFile->bodyMaxSize < size) {
+ const auto blocks = bytes | std::views::chunk(openFile->bodyMaxSize);
+ std::vector<std::future<void>> ops;
+ ops.reserve(blocks.size());
+ std::ranges::transform(blocks, std::back_inserter(ops), [&remote, bytes, offset](auto && block) {
+ const auto position = offset + (block.begin() - bytes.begin());
+ return remote->writeAsync(position, safe(block.size()), toBuffer(block));
+ });
+ std::ranges::for_each(ops, &std::future<void>::get);
+ }
+ else {
+ remote->write(offset, safe {size}, toBuffer(bytes));
+ }
+ return safe {size};
}
- else {
- remote->write(o, s, Buffer(buf, buf + s));
+ catch (SystemError & e) {
+ return -e.syserrno;
}
- return s;
}
- catch (SystemError & e) {
- return -e.syserrno;
- }
-}
-int
-FuseApp::truncate(const char * p, off_t o)
-{
- try {
- volume->truncate(reqEnv(), p, o);
- return 0;
- }
- catch (SystemError & e) {
- return -e.syserrno;
+ ssize_t
+ FuseApp::copy_file_range(const char *, struct fuse_file_info * fileInfoIn, off_t offsetIn, const char *,
+ struct fuse_file_info * fileInfoOut, off_t offsetOut, size_t size, int flags)
+ {
+ try {
+ auto openFileIn = getProxy<OpenFilePtr>(fileInfoIn->fh);
+ auto openFileOut = getProxy<OpenFilePtr>(fileInfoOut->fh);
+ openFileIn->wait();
+ openFileOut->wait();
+ return openFileIn->remote->copyrange(openFileOut->remote, offsetIn, offsetOut, safe {size}, flags);
+ }
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
}
-}
-int
-FuseApp::ftruncate(const char *, off_t o, fuse_file_info * fi)
-{
- try {
- auto of = getProxy<OpenFilePtr>(fi->fh);
- of->wait();
- auto remote = of->remote;
- remote->ftruncate(reqEnv(), o);
- return 0;
- }
- catch (SystemError & e) {
- return -e.syserrno;
+ int
+ FuseApp::truncate(const char * path, off_t offset, fuse_file_info * fileInfo)
+ {
+ try {
+ if (fileInfo) {
+ auto openFile = getProxy<OpenFilePtr>(fileInfo->fh);
+ openFile->wait();
+ openFile->remote->ftruncate(offset);
+ }
+ else {
+ volume->truncate(reqEnv(), path, offset);
+ }
+ return 0;
+ }
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
}
-}
-int
-FuseApp::fgetattr(const char *, struct stat * s, fuse_file_info * fi)
-{
- try {
- auto of = getProxy<OpenFilePtr>(fi->fh);
- of->wait();
- auto remote = of->remote;
- *s = converter.convert(remote->fgetattr(reqEnv()));
- return 0;
- }
- catch (SystemError & e) {
- return -e.syserrno;
+ int
+ FuseApp::getattr(const char * path, struct stat * stat, fuse_file_info * fileInfo)
+ {
+ try {
+ if (fileInfo) {
+ auto openFile = getProxy<OpenFilePtr>(fileInfo->fh);
+ openFile->wait();
+ *stat = converter.convert(openFile->remote->fgetattr());
+ }
+ else {
+ if (auto cacehedStat = statCache.get(std::filesystem::hash_value(path))) {
+ *stat = *cacehedStat;
+ }
+ else {
+ *stat = converter.convert(volume->getattr(reqEnv(), path));
+ }
+ }
+ return stat->st_mode ? 0 : -ENOENT;
+ }
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
}
-}
-int
-FuseApp::unlink(const char * p)
-{
- try {
- volume->unlink(reqEnv(), p);
- return 0;
- }
- catch (SystemError & e) {
- return -e.syserrno;
+ int
+ FuseApp::unlink(const char * path)
+ {
+ try {
+ volume->unlink(reqEnv(), path);
+ return 0;
+ }
+ catch (SystemError & e) {
+ return -e.syserrno;
+ }
}
}
-}
-