summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2022-05-22 14:00:10 +0100
committerDan Goodliffe <dan@randomdan.homeip.net>2022-05-22 14:44:07 +0100
commit1281577e2f2fdc7cad7745ad0e36afe266c7d167 (patch)
tree87d6e022a519c0722db2d8549e8ad76f0c8006bd
parentStd algo write overlap testing (diff)
downloadnetfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.tar.bz2
netfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.tar.xz
netfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.zip
Handle batch reads as they come back
-rw-r--r--netfs/fuse/fuseFiles.cpp115
1 files changed, 74 insertions, 41 deletions
diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp
index 7cc6413..82df245 100644
--- a/netfs/fuse/fuseFiles.cpp
+++ b/netfs/fuse/fuseFiles.cpp
@@ -2,11 +2,20 @@
#include "fuseApp.impl.h"
#include "lockHelpers.h"
#include <Ice/BuiltinSequences.h>
+#include <algorithm>
+#include <atomic>
#include <cstring>
#include <entCache.h>
+#include <exception>
+#include <functional>
+#include <future>
+#include <iterator>
+#include <memory>
#include <mutex>
#include <numeric.h>
#include <numeric>
+#include <utility>
+#include <vector>
namespace NetFS {
FuseApp::OpenFile::WriteState::WriteState() : future(promise.get_future().share()) { }
@@ -136,8 +145,8 @@ namespace NetFS {
return std::forward<decltype(v)>(v);
},
[](auto && i) {
- return i.second->future;
- });
+ return i.second->future;
+ });
// Wait for them whilst unlocked
lock.release()->unlock();
try {
@@ -169,48 +178,72 @@ namespace NetFS {
}
}
+ template<typename T>
+ auto
+ operator++(std::vector<std::unique_ptr<T>> & v)
+ {
+ return v.emplace_back(std::make_unique<T>()).get();
+ }
+
int
FuseApp::read(const char *, char * buf, size_t s, off_t o, struct fuse_file_info * fi)
{
try {
- struct bgOp {
- bgOp(int o, std::future<Ice::ByteSeq> r) : offset {o}, response {std::move(r)} { }
- int offset;
- std::future<Ice::ByteSeq> response;
- };
+ using BackgroundOps = std::vector<std::unique_ptr<std::promise<int>>>;
+ using BackgroundFuncs = std::vector<std::function<void()>>;
auto cpy = [buf](off_t blockOffset, const auto && data) -> int {
std::copy(data.begin(), data.end(), buf + blockOffset);
return safe {data.size()};
};
+ auto collateTotal = [](auto && ops) {
+ return std::accumulate(ops.begin(), ops.end(), 0, [](auto && total, auto & op) {
+ return total += op->get_future().get();
+ });
+ };
auto of = getProxy<OpenFilePtr>(fi->fh);
auto remote = of->remote;
auto blockSizes = of->blockSizes(s);
if (fcr->Async) {
- std::vector<bgOp> ops;
- blockSizeIterate(
- blockSizes, [&ops, &o, &of, &remote, blockOffset = 0U](safe<size_t> blockSize) mutable {
- ops.emplace_back(blockOffset,
- waitOnWriteRangeAndThen(
- blockSize, o, of, [o, blockSize, &remote](const auto &) {
- return remote->readAsync(o, blockSize);
- }));
+ BackgroundFuncs fs;
+ BackgroundOps ops;
+ blockSizeIterate(blockSizes,
+ [&ops, &fs, &o, &of, &remote, blockOffset = 0U, &cpy](safe<size_t> blockSize) mutable {
+ const auto p = ++ops;
+ fs.push_back(waitOnWriteRangeAndThen(
+ blockSize, o, of, [p, o, blockOffset, blockSize, &remote, &cpy](const auto &) {
+ return remote->readAsync(
+ o, blockSize,
+ [cpy, p, blockOffset](auto && resultBuf) {
+ p->set_value(
+ cpy(blockOffset, std::forward<Ice::ByteSeq>(resultBuf)));
+ },
+ [p](auto ex) {
+ p->set_exception(ex);
+ });
+ }));
o += blockSize;
blockOffset += blockSize;
});
- return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) {
- return total += cpy(op.offset, op.response.get());
- });
+ return collateTotal(ops);
}
else if (of->bodyMaxSize < s) {
- std::vector<bgOp> ops;
- blockSizeIterate(blockSizes, [&ops, &o, &remote, blockOffset = 0U](safe<size_t> blockSize) mutable {
- ops.emplace_back(blockOffset, remote->readAsync(o, blockSize));
- o += blockSize;
- blockOffset += blockSize;
- });
- return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) {
- return total += cpy(op.offset, op.response.get());
- });
+ BackgroundFuncs fs;
+ BackgroundOps ops;
+ blockSizeIterate(
+ blockSizes, [&o, &remote, blockOffset = 0U, &ops, &fs, &cpy](safe<size_t> blockSize) mutable {
+ const auto p = ++ops;
+ fs.push_back(remote->readAsync(
+ o, blockSize,
+ [cpy, p, blockOffset](auto && resultBuf) {
+ p->set_value(cpy(blockOffset, std::forward<Ice::ByteSeq>(resultBuf)));
+ },
+ [p](auto ex) {
+ p->set_exception(ex);
+ }));
+ o += blockSize;
+ blockOffset += blockSize;
+ });
+ return collateTotal(ops);
}
else {
return cpy(0, remote->read(o, safe {s}));
@@ -231,20 +264,20 @@ namespace NetFS {
if (fcr->Async) {
blockSizeIterate(of->blockSizes(s), [&bytes, &o, &remote, &of](safe<size_t> blockSize) {
waitOnWriteRangeAndThen(blockSize, o, of, [o, blockSize, bytes, &of, remote](const auto & key) {
- auto p = std::make_shared<OpenFile::WriteState>();
- of->bg.insert({key, p});
- remote->writeAsync(
- o, blockSize, std::make_pair(bytes, bytes + blockSize),
- [p, of, key]() {
- p->promise.set_value();
- ScopeLock(of->mutex) {
- of->bg.erase(key);
- }
- },
- [p, of](auto e) {
- p->promise.set_exception(e);
- });
- });
+ auto p = std::make_shared<OpenFile::WriteState>();
+ of->bg.insert({key, p});
+ remote->writeAsync(
+ o, blockSize, std::make_pair(bytes, bytes + blockSize),
+ [p, of, key]() {
+ p->promise.set_value();
+ ScopeLock(of->mutex) {
+ of->bg.erase(key);
+ }
+ },
+ [p, of](auto e) {
+ p->promise.set_exception(e);
+ });
+ });
bytes += blockSize;
o += blockSize;
});