summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Goodliffe <dan@randomdan.homeip.net>2017-12-16 20:33:35 +0000
committerDan Goodliffe <dan@randomdan.homeip.net>2017-12-16 20:51:55 +0000
commit402f944965db85f900458633ebb05dd6496b75f1 (patch)
treedd896347010b954550987492fb08005c2645f275
parentUse std::shared_ptr and variadic calls to make_shared in shared components (diff)
downloadnetfs-402f944965db85f900458633ebb05dd6496b75f1.tar.bz2
netfs-402f944965db85f900458633ebb05dd6496b75f1.tar.xz
netfs-402f944965db85f900458633ebb05dd6496b75f1.zip
Use an interval map for tracking in-progress writes
-rw-r--r--netfs/fuse/fuseApp.h6
-rw-r--r--netfs/fuse/fuseFiles.cpp59
-rw-r--r--netfs/unittests/Jamfile.jam1
-rw-r--r--netfs/unittests/testCore.cpp51
4 files changed, 92 insertions, 25 deletions
diff --git a/netfs/fuse/fuseApp.h b/netfs/fuse/fuseApp.h
index 3f4d40d..9486e41 100644
--- a/netfs/fuse/fuseApp.h
+++ b/netfs/fuse/fuseApp.h
@@ -13,6 +13,7 @@
#include "fuseConfig.h"
#include "cache.h"
#include <visibility.h>
+#include <boost/icl/interval_map.hpp>
namespace NetFS {
class DLL_PUBLIC FuseApp : public FuseAppBase {
@@ -33,13 +34,14 @@ namespace NetFS {
OpenFile(FilePrx remote, const std::string & path, int flags);
void flush();
- void nonBlockingFlush();
void wait() const;
FilePrx remote;
const std::string path;
const int flags;
- std::list<Ice::AsyncResultPtr> bg;
+ typedef boost::icl::interval_map<Ice::Long, Ice::AsyncResultPtr> BGs;
+ static BGs::interval_type range(off_t, size_t);
+ BGs bg;
mutable boost::shared_mutex _lock;
};
typedef std::shared_ptr<OpenFile> OpenFilePtr;
diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp
index 4c93d56..eb80531 100644
--- a/netfs/fuse/fuseFiles.cpp
+++ b/netfs/fuse/fuseFiles.cpp
@@ -23,7 +23,7 @@ FuseApp::OpenFile::wait() const
{
SharedLock(_lock);
for (const auto & w : bg) {
- w->waitForCompleted();
+ w.second->waitForCompleted();
}
}
@@ -32,29 +32,18 @@ FuseApp::OpenFile::flush()
{
Lock(_lock);
while (!bg.empty()) {
- auto w = bg.front();
- bg.pop_front();
+ auto wi = bg.begin();
+ auto w = wi->second;
+ bg.erase(w);
// bg operations are void, so no need to actually get the return value
w->throwLocalException();
}
}
-void
-FuseApp::OpenFile::nonBlockingFlush()
+FuseApp::OpenFile::BGs::interval_type
+FuseApp::OpenFile::range(off_t o, size_t s)
{
- boost::unique_lock<boost::shared_mutex> lock(_lock, boost::try_to_lock);
- if (lock.owns_lock()) {
- for (auto wi = bg.begin(); wi != bg.end(); ) {
- auto w = *wi;
- if (w->isCompleted()) {
- wi = bg.erase(wi);
- w->throwLocalException();
- }
- else {
- wi++;
- }
- }
- }
+ return OpenFile::BGs::interval_type::right_open(o, o + s);
}
int
@@ -138,15 +127,39 @@ FuseApp::write(const char *, const char * buf, size_t s, off_t o, struct fuse_fi
try {
auto of = getProxy<OpenFilePtr>(fi->fh);
auto remote = of->remote;
- auto r = remote->begin_write(o, s, Buffer(buf, buf + s));
if (fcr->Async) {
- ScopeLock(of->_lock) {
- of->bg.push_back(r);
+ const auto key = OpenFile::range(s, o);
+ while (true) {
+ ScopeLock(of->_lock) {
+ // Acquire operations to wait for
+ std::vector<Ice::AsyncResultPtr> overlap;
+ auto R = of->bg.equal_range(key);
+ if (R.first == R.second) {
+ // Begin write and store operation
+ auto r = remote->begin_write(o, s, Buffer(buf, buf + s), [of,key](const Ice::AsyncResultPtr &) {
+ ScopeLock(of->_lock) {
+ of->bg.erase(key);
+ }
+ });
+ of->bg.insert({key, r});
+ return s;
+ }
+ else {
+ // Wait for them whilst unlocked
+ of->_lock.unlock();
+ overlap.reserve(std::distance(R.first, R.second));
+ for (auto i = R.first; i != R.second; i++) {
+ overlap.push_back(i->second);
+ }
+ for (const auto & r : overlap) {
+ r->waitForCompleted();
+ }
+ }
+ }
}
- of->nonBlockingFlush();
}
else {
- r->throwLocalException();
+ remote->write(o, s, Buffer(buf, buf + s));
}
return s;
}
diff --git a/netfs/unittests/Jamfile.jam b/netfs/unittests/Jamfile.jam
index 88fa556..ab15b8d 100644
--- a/netfs/unittests/Jamfile.jam
+++ b/netfs/unittests/Jamfile.jam
@@ -26,6 +26,7 @@ lib testMocks :
<library>..//adhocutil
<library>boost_system
<library>boost_filesystem
+ <library>../fuse//netfs-client
<define>ROOT=\"$(me)\"
;
diff --git a/netfs/unittests/testCore.cpp b/netfs/unittests/testCore.cpp
index 76b3c4c..a8431ff 100644
--- a/netfs/unittests/testCore.cpp
+++ b/netfs/unittests/testCore.cpp
@@ -1,5 +1,6 @@
#define BOOST_TEST_MODULE TestNetFSCore
#include <boost/test/unit_test.hpp>
+#define private public
#include "mockDaemon.h"
#include "mockFuse.h"
#include <definedDirs.h>
@@ -11,6 +12,8 @@ const auto testExport = UniqueExport::get(getpid());
const std::string testEndpoint("tcp -h localhost -p 12012");
const std::string testUri("tcp://localhost:12012/testvol");
+BOOST_TEST_DONT_PRINT_LOG_VALUE(NetFS::FuseApp::OpenFile::BGs::iterator);
+
bool
operator==(const struct stat & a, const struct stat & b)
{
@@ -482,3 +485,51 @@ BOOST_AUTO_TEST_CASE( uriConnect )
BOOST_REQUIRE_EQUAL(0, fuse.fuse->statfs("/", &s));
}
+BOOST_AUTO_TEST_CASE( interval_map_works_how_i_think )
+{
+ // Used as proof that setting and selecting ranges given an
+ // attempt to write to [offset, offset + size) will find
+ // any currently executing writes that overlap that range.
+ Ice::CommunicatorPtr ic = Ice::initialize({});
+ NetFS::FilePrx fp = NetFS::FilePrx::uncheckedCast(ic->stringToProxy("x:default"));
+
+ auto r = NetFS::FuseApp::OpenFile::range;
+ NetFS::FuseApp::OpenFile::BGs map;
+ auto check = [&](off_t o, size_t s, int c) {
+ BOOST_TEST_CONTEXT("offset: " << o << ", size: " << s << ", count: " << c) {
+ const auto ol = map.equal_range(r(o, s));
+ BOOST_TEST_CONTEXT("range: " << ol.first->first.lower() << " to " << ol.first->first.upper()) {
+ BOOST_REQUIRE_EQUAL(std::distance(ol.first, ol.second), c);
+ }
+ }
+ };
+
+ BOOST_REQUIRE(map.empty());
+ // Pretend we have 3 writes in progress
+ map.insert({r(5, 10), fp->begin_write(0, 0, {}) }); // [5, 15)
+ map.insert({r(15, 15), fp->begin_write(0, 0, {}) }); // [15, 30)
+ map.insert({r(35, 10), fp->begin_write(0, 0, {}) }); // [35, 45)
+ // Then the following writes to [O, O+S) should wait on N pending writes
+ check(0, 1, 0); // Clear before
+ check(100, 10, 0); // Clear after
+ check(0, 100, 3); // Over all
+ check(0, 5, 0); // Before edge case
+ check(0, 6, 1); // One byte overlap on first
+ check(45, 5, 0); // After edge case
+ check(44, 5, 1); // One byte overlap on last
+ check(10, 15, 2); // Middle of first to middle of second
+ check(10, 30, 3); // Middle of first to middle of last
+ check(20, 30, 2); // Middle of first to after last
+ check(20, 20, 2); // Middle of first to middle of last
+ check(26, 2, 1); // Entirely within second
+ check(35, 10, 1); // Match last
+ check(34, 12, 1); // Surround last
+ // Write completes remove
+ map.erase(r(5, 20));
+ map.erase(r(10, 20));
+ map.erase(r(35, 10));
+ BOOST_REQUIRE(map.empty());
+
+ ic->destroy();
+}
+