diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2022-05-22 14:00:10 +0100 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2022-05-22 14:44:07 +0100 |
commit | 1281577e2f2fdc7cad7745ad0e36afe266c7d167 (patch) | |
tree | 87d6e022a519c0722db2d8549e8ad76f0c8006bd | |
parent | Std algo write overlap testing (diff) | |
download | netfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.tar.bz2 netfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.tar.xz netfs-1281577e2f2fdc7cad7745ad0e36afe266c7d167.zip |
Handle batch reads as they come back
-rw-r--r-- | netfs/fuse/fuseFiles.cpp | 115 |
1 files changed, 74 insertions, 41 deletions
diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp index 7cc6413..82df245 100644 --- a/netfs/fuse/fuseFiles.cpp +++ b/netfs/fuse/fuseFiles.cpp @@ -2,11 +2,20 @@ #include "fuseApp.impl.h" #include "lockHelpers.h" #include <Ice/BuiltinSequences.h> +#include <algorithm> +#include <atomic> #include <cstring> #include <entCache.h> +#include <exception> +#include <functional> +#include <future> +#include <iterator> +#include <memory> #include <mutex> #include <numeric.h> #include <numeric> +#include <utility> +#include <vector> namespace NetFS { FuseApp::OpenFile::WriteState::WriteState() : future(promise.get_future().share()) { } @@ -136,8 +145,8 @@ namespace NetFS { return std::forward<decltype(v)>(v); }, [](auto && i) { - return i.second->future; - }); + return i.second->future; + }); // Wait for them whilst unlocked lock.release()->unlock(); try { @@ -169,48 +178,72 @@ namespace NetFS { } } + template<typename T> + auto + operator++(std::vector<std::unique_ptr<T>> & v) + { + return v.emplace_back(std::make_unique<T>()).get(); + } + int FuseApp::read(const char *, char * buf, size_t s, off_t o, struct fuse_file_info * fi) { try { - struct bgOp { - bgOp(int o, std::future<Ice::ByteSeq> r) : offset {o}, response {std::move(r)} { } - int offset; - std::future<Ice::ByteSeq> response; - }; + using BackgroundOps = std::vector<std::unique_ptr<std::promise<int>>>; + using BackgroundFuncs = std::vector<std::function<void()>>; auto cpy = [buf](off_t blockOffset, const auto && data) -> int { std::copy(data.begin(), data.end(), buf + blockOffset); return safe {data.size()}; }; + auto collateTotal = [](auto && ops) { + return std::accumulate(ops.begin(), ops.end(), 0, [](auto && total, auto & op) { + return total += op->get_future().get(); + }); + }; auto of = getProxy<OpenFilePtr>(fi->fh); auto remote = of->remote; auto blockSizes = of->blockSizes(s); if (fcr->Async) { - std::vector<bgOp> ops; - blockSizeIterate( - blockSizes, [&ops, &o, &of, &remote, blockOffset = 0U](safe<size_t> blockSize) mutable { - ops.emplace_back(blockOffset, - waitOnWriteRangeAndThen( - blockSize, o, of, [o, blockSize, &remote](const auto &) { - return remote->readAsync(o, blockSize); - })); + BackgroundFuncs fs; + BackgroundOps ops; + blockSizeIterate(blockSizes, + [&ops, &fs, &o, &of, &remote, blockOffset = 0U, &cpy](safe<size_t> blockSize) mutable { + const auto p = ++ops; + fs.push_back(waitOnWriteRangeAndThen( + blockSize, o, of, [p, o, blockOffset, blockSize, &remote, &cpy](const auto &) { + return remote->readAsync( + o, blockSize, + [cpy, p, blockOffset](auto && resultBuf) { + p->set_value( + cpy(blockOffset, std::forward<Ice::ByteSeq>(resultBuf))); + }, + [p](auto ex) { + p->set_exception(ex); + }); + })); o += blockSize; blockOffset += blockSize; }); - return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) { - return total += cpy(op.offset, op.response.get()); - }); + return collateTotal(ops); } else if (of->bodyMaxSize < s) { - std::vector<bgOp> ops; - blockSizeIterate(blockSizes, [&ops, &o, &remote, blockOffset = 0U](safe<size_t> blockSize) mutable { - ops.emplace_back(blockOffset, remote->readAsync(o, blockSize)); - o += blockSize; - blockOffset += blockSize; - }); - return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) { - return total += cpy(op.offset, op.response.get()); - }); + BackgroundFuncs fs; + BackgroundOps ops; + blockSizeIterate( + blockSizes, [&o, &remote, blockOffset = 0U, &ops, &fs, &cpy](safe<size_t> blockSize) mutable { + const auto p = ++ops; + fs.push_back(remote->readAsync( + o, blockSize, + [cpy, p, blockOffset](auto && resultBuf) { + p->set_value(cpy(blockOffset, std::forward<Ice::ByteSeq>(resultBuf))); + }, + [p](auto ex) { + p->set_exception(ex); + })); + o += blockSize; + blockOffset += blockSize; + }); + return collateTotal(ops); } else { return cpy(0, remote->read(o, safe {s})); @@ -231,20 +264,20 @@ namespace NetFS { if (fcr->Async) { blockSizeIterate(of->blockSizes(s), [&bytes, &o, &remote, &of](safe<size_t> blockSize) { waitOnWriteRangeAndThen(blockSize, o, of, [o, blockSize, bytes, &of, remote](const auto & key) { - auto p = std::make_shared<OpenFile::WriteState>(); - of->bg.insert({key, p}); - remote->writeAsync( - o, blockSize, std::make_pair(bytes, bytes + blockSize), - [p, of, key]() { - p->promise.set_value(); - ScopeLock(of->mutex) { - of->bg.erase(key); - } - }, - [p, of](auto e) { - p->promise.set_exception(e); - }); - }); + auto p = std::make_shared<OpenFile::WriteState>(); + of->bg.insert({key, p}); + remote->writeAsync( + o, blockSize, std::make_pair(bytes, bytes + blockSize), + [p, of, key]() { + p->promise.set_value(); + ScopeLock(of->mutex) { + of->bg.erase(key); + } + }, + [p, of](auto e) { + p->promise.set_exception(e); + }); + }); bytes += blockSize; o += blockSize; }); |