From 1281577e2f2fdc7cad7745ad0e36afe266c7d167 Mon Sep 17 00:00:00 2001 From: Dan Goodliffe Date: Sun, 22 May 2022 14:00:10 +0100 Subject: Handle batch reads as they come back --- netfs/fuse/fuseFiles.cpp | 115 ++++++++++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 41 deletions(-) diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp index 7cc6413..82df245 100644 --- a/netfs/fuse/fuseFiles.cpp +++ b/netfs/fuse/fuseFiles.cpp @@ -2,11 +2,20 @@ #include "fuseApp.impl.h" #include "lockHelpers.h" #include +#include +#include #include #include +#include +#include +#include +#include +#include #include #include #include +#include +#include namespace NetFS { FuseApp::OpenFile::WriteState::WriteState() : future(promise.get_future().share()) { } @@ -136,8 +145,8 @@ namespace NetFS { return std::forward(v); }, [](auto && i) { - return i.second->future; - }); + return i.second->future; + }); // Wait for them whilst unlocked lock.release()->unlock(); try { @@ -169,48 +178,72 @@ namespace NetFS { } } + template + auto + operator++(std::vector> & v) + { + return v.emplace_back(std::make_unique()).get(); + } + int FuseApp::read(const char *, char * buf, size_t s, off_t o, struct fuse_file_info * fi) { try { - struct bgOp { - bgOp(int o, std::future r) : offset {o}, response {std::move(r)} { } - int offset; - std::future response; - }; + using BackgroundOps = std::vector>>; + using BackgroundFuncs = std::vector>; auto cpy = [buf](off_t blockOffset, const auto && data) -> int { std::copy(data.begin(), data.end(), buf + blockOffset); return safe {data.size()}; }; + auto collateTotal = [](auto && ops) { + return std::accumulate(ops.begin(), ops.end(), 0, [](auto && total, auto & op) { + return total += op->get_future().get(); + }); + }; auto of = getProxy(fi->fh); auto remote = of->remote; auto blockSizes = of->blockSizes(s); if (fcr->Async) { - std::vector ops; - blockSizeIterate( - blockSizes, [&ops, &o, &of, &remote, blockOffset = 0U](safe blockSize) mutable { - ops.emplace_back(blockOffset, - waitOnWriteRangeAndThen( - blockSize, o, of, [o, blockSize, &remote](const auto &) { - return remote->readAsync(o, blockSize); - })); + BackgroundFuncs fs; + BackgroundOps ops; + blockSizeIterate(blockSizes, + [&ops, &fs, &o, &of, &remote, blockOffset = 0U, &cpy](safe blockSize) mutable { + const auto p = ++ops; + fs.push_back(waitOnWriteRangeAndThen( + blockSize, o, of, [p, o, blockOffset, blockSize, &remote, &cpy](const auto &) { + return remote->readAsync( + o, blockSize, + [cpy, p, blockOffset](auto && resultBuf) { + p->set_value( + cpy(blockOffset, std::forward(resultBuf))); + }, + [p](auto ex) { + p->set_exception(ex); + }); + })); o += blockSize; blockOffset += blockSize; }); - return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) { - return total += cpy(op.offset, op.response.get()); - }); + return collateTotal(ops); } else if (of->bodyMaxSize < s) { - std::vector ops; - blockSizeIterate(blockSizes, [&ops, &o, &remote, blockOffset = 0U](safe blockSize) mutable { - ops.emplace_back(blockOffset, remote->readAsync(o, blockSize)); - o += blockSize; - blockOffset += blockSize; - }); - return std::accumulate(ops.begin(), ops.end(), 0, [cpy](auto && total, auto & op) { - return total += cpy(op.offset, op.response.get()); - }); + BackgroundFuncs fs; + BackgroundOps ops; + blockSizeIterate( + blockSizes, [&o, &remote, blockOffset = 0U, &ops, &fs, &cpy](safe blockSize) mutable { + const auto p = ++ops; + fs.push_back(remote->readAsync( + o, blockSize, + [cpy, p, blockOffset](auto && resultBuf) { + p->set_value(cpy(blockOffset, std::forward(resultBuf))); + }, + [p](auto ex) { + p->set_exception(ex); + })); + o += blockSize; + blockOffset += blockSize; + }); + return collateTotal(ops); } else { return cpy(0, remote->read(o, safe {s})); @@ -231,20 +264,20 @@ namespace NetFS { if (fcr->Async) { blockSizeIterate(of->blockSizes(s), [&bytes, &o, &remote, &of](safe blockSize) { waitOnWriteRangeAndThen(blockSize, o, of, [o, blockSize, bytes, &of, remote](const auto & key) { - auto p = std::make_shared(); - of->bg.insert({key, p}); - remote->writeAsync( - o, blockSize, std::make_pair(bytes, bytes + blockSize), - [p, of, key]() { - p->promise.set_value(); - ScopeLock(of->mutex) { - of->bg.erase(key); - } - }, - [p, of](auto e) { - p->promise.set_exception(e); - }); - }); + auto p = std::make_shared(); + of->bg.insert({key, p}); + remote->writeAsync( + o, blockSize, std::make_pair(bytes, bytes + blockSize), + [p, of, key]() { + p->promise.set_value(); + ScopeLock(of->mutex) { + of->bg.erase(key); + } + }, + [p, of](auto e) { + p->promise.set_exception(e); + }); + }); bytes += blockSize; o += blockSize; }); -- cgit v1.2.3