diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-12-16 20:33:35 +0000 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-12-16 20:51:55 +0000 |
commit | 402f944965db85f900458633ebb05dd6496b75f1 (patch) | |
tree | dd896347010b954550987492fb08005c2645f275 | |
parent | Use std::shared_ptr and variadic calls to make_shared in shared components (diff) | |
download | netfs-402f944965db85f900458633ebb05dd6496b75f1.tar.bz2 netfs-402f944965db85f900458633ebb05dd6496b75f1.tar.xz netfs-402f944965db85f900458633ebb05dd6496b75f1.zip |
Use an interval map for tracking in-progress writes
-rw-r--r-- | netfs/fuse/fuseApp.h | 6 | ||||
-rw-r--r-- | netfs/fuse/fuseFiles.cpp | 59 | ||||
-rw-r--r-- | netfs/unittests/Jamfile.jam | 1 | ||||
-rw-r--r-- | netfs/unittests/testCore.cpp | 51 |
4 files changed, 92 insertions, 25 deletions
diff --git a/netfs/fuse/fuseApp.h b/netfs/fuse/fuseApp.h index 3f4d40d..9486e41 100644 --- a/netfs/fuse/fuseApp.h +++ b/netfs/fuse/fuseApp.h @@ -13,6 +13,7 @@ #include "fuseConfig.h" #include "cache.h" #include <visibility.h> +#include <boost/icl/interval_map.hpp> namespace NetFS { class DLL_PUBLIC FuseApp : public FuseAppBase { @@ -33,13 +34,14 @@ namespace NetFS { OpenFile(FilePrx remote, const std::string & path, int flags); void flush(); - void nonBlockingFlush(); void wait() const; FilePrx remote; const std::string path; const int flags; - std::list<Ice::AsyncResultPtr> bg; + typedef boost::icl::interval_map<Ice::Long, Ice::AsyncResultPtr> BGs; + static BGs::interval_type range(off_t, size_t); + BGs bg; mutable boost::shared_mutex _lock; }; typedef std::shared_ptr<OpenFile> OpenFilePtr; diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp index 4c93d56..eb80531 100644 --- a/netfs/fuse/fuseFiles.cpp +++ b/netfs/fuse/fuseFiles.cpp @@ -23,7 +23,7 @@ FuseApp::OpenFile::wait() const { SharedLock(_lock); for (const auto & w : bg) { - w->waitForCompleted(); + w.second->waitForCompleted(); } } @@ -32,29 +32,18 @@ FuseApp::OpenFile::flush() { Lock(_lock); while (!bg.empty()) { - auto w = bg.front(); - bg.pop_front(); + auto wi = bg.begin(); + auto w = wi->second; + bg.erase(w); // bg operations are void, so no need to actually get the return value w->throwLocalException(); } } -void -FuseApp::OpenFile::nonBlockingFlush() +FuseApp::OpenFile::BGs::interval_type +FuseApp::OpenFile::range(off_t o, size_t s) { - boost::unique_lock<boost::shared_mutex> lock(_lock, boost::try_to_lock); - if (lock.owns_lock()) { - for (auto wi = bg.begin(); wi != bg.end(); ) { - auto w = *wi; - if (w->isCompleted()) { - wi = bg.erase(wi); - w->throwLocalException(); - } - else { - wi++; - } - } - } + return OpenFile::BGs::interval_type::right_open(o, o + s); } int @@ -138,15 +127,39 @@ FuseApp::write(const char *, const char * buf, size_t s, off_t o, struct fuse_fi try { auto of = getProxy<OpenFilePtr>(fi->fh); auto remote = of->remote; - auto r = remote->begin_write(o, s, Buffer(buf, buf + s)); if (fcr->Async) { - ScopeLock(of->_lock) { - of->bg.push_back(r); + const auto key = OpenFile::range(s, o); + while (true) { + ScopeLock(of->_lock) { + // Acquire operations to wait for + std::vector<Ice::AsyncResultPtr> overlap; + auto R = of->bg.equal_range(key); + if (R.first == R.second) { + // Begin write and store operation + auto r = remote->begin_write(o, s, Buffer(buf, buf + s), [of,key](const Ice::AsyncResultPtr &) { + ScopeLock(of->_lock) { + of->bg.erase(key); + } + }); + of->bg.insert({key, r}); + return s; + } + else { + // Wait for them whilst unlocked + of->_lock.unlock(); + overlap.reserve(std::distance(R.first, R.second)); + for (auto i = R.first; i != R.second; i++) { + overlap.push_back(i->second); + } + for (const auto & r : overlap) { + r->waitForCompleted(); + } + } + } } - of->nonBlockingFlush(); } else { - r->throwLocalException(); + remote->write(o, s, Buffer(buf, buf + s)); } return s; } diff --git a/netfs/unittests/Jamfile.jam b/netfs/unittests/Jamfile.jam index 88fa556..ab15b8d 100644 --- a/netfs/unittests/Jamfile.jam +++ b/netfs/unittests/Jamfile.jam @@ -26,6 +26,7 @@ lib testMocks : <library>..//adhocutil <library>boost_system <library>boost_filesystem + <library>../fuse//netfs-client <define>ROOT=\"$(me)\" ; diff --git a/netfs/unittests/testCore.cpp b/netfs/unittests/testCore.cpp index 76b3c4c..a8431ff 100644 --- a/netfs/unittests/testCore.cpp +++ b/netfs/unittests/testCore.cpp @@ -1,5 +1,6 @@ #define BOOST_TEST_MODULE TestNetFSCore #include <boost/test/unit_test.hpp> +#define private public #include "mockDaemon.h" #include "mockFuse.h" #include <definedDirs.h> @@ -11,6 +12,8 @@ const auto testExport = UniqueExport::get(getpid()); const std::string testEndpoint("tcp -h localhost -p 12012"); const std::string testUri("tcp://localhost:12012/testvol"); +BOOST_TEST_DONT_PRINT_LOG_VALUE(NetFS::FuseApp::OpenFile::BGs::iterator); + bool operator==(const struct stat & a, const struct stat & b) { @@ -482,3 +485,51 @@ BOOST_AUTO_TEST_CASE( uriConnect ) BOOST_REQUIRE_EQUAL(0, fuse.fuse->statfs("/", &s)); } +BOOST_AUTO_TEST_CASE( interval_map_works_how_i_think ) +{ + // Used as proof that setting and selecting ranges given an + // attempt to write to [offset, offset + size) will find + // any currently executing writes that overlap that range. + Ice::CommunicatorPtr ic = Ice::initialize({}); + NetFS::FilePrx fp = NetFS::FilePrx::uncheckedCast(ic->stringToProxy("x:default")); + + auto r = NetFS::FuseApp::OpenFile::range; + NetFS::FuseApp::OpenFile::BGs map; + auto check = [&](off_t o, size_t s, int c) { + BOOST_TEST_CONTEXT("offset: " << o << ", size: " << s << ", count: " << c) { + const auto ol = map.equal_range(r(o, s)); + BOOST_TEST_CONTEXT("range: " << ol.first->first.lower() << " to " << ol.first->first.upper()) { + BOOST_REQUIRE_EQUAL(std::distance(ol.first, ol.second), c); + } + } + }; + + BOOST_REQUIRE(map.empty()); + // Pretend we have 3 writes in progress + map.insert({r(5, 10), fp->begin_write(0, 0, {}) }); // [5, 15) + map.insert({r(15, 15), fp->begin_write(0, 0, {}) }); // [15, 30) + map.insert({r(35, 10), fp->begin_write(0, 0, {}) }); // [35, 45) + // Then the following writes to [O, O+S) should wait on N pending writes + check(0, 1, 0); // Clear before + check(100, 10, 0); // Clear after + check(0, 100, 3); // Over all + check(0, 5, 0); // Before edge case + check(0, 6, 1); // One byte overlap on first + check(45, 5, 0); // After edge case + check(44, 5, 1); // One byte overlap on last + check(10, 15, 2); // Middle of first to middle of second + check(10, 30, 3); // Middle of first to middle of last + check(20, 30, 2); // Middle of first to after last + check(20, 20, 2); // Middle of first to middle of last + check(26, 2, 1); // Entirely within second + check(35, 10, 1); // Match last + check(34, 12, 1); // Surround last + // Write completes remove + map.erase(r(5, 20)); + map.erase(r(10, 20)); + map.erase(r(35, 10)); + BOOST_REQUIRE(map.empty()); + + ic->destroy(); +} + |