diff options
| author | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-12-16 20:33:35 +0000 | 
|---|---|---|
| committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2017-12-16 20:51:55 +0000 | 
| commit | 402f944965db85f900458633ebb05dd6496b75f1 (patch) | |
| tree | dd896347010b954550987492fb08005c2645f275 | |
| parent | Use std::shared_ptr and variadic calls to make_shared in shared components (diff) | |
| download | netfs-402f944965db85f900458633ebb05dd6496b75f1.tar.bz2 netfs-402f944965db85f900458633ebb05dd6496b75f1.tar.xz netfs-402f944965db85f900458633ebb05dd6496b75f1.zip  | |
Use an interval map for tracking in-progress writes
| -rw-r--r-- | netfs/fuse/fuseApp.h | 6 | ||||
| -rw-r--r-- | netfs/fuse/fuseFiles.cpp | 59 | ||||
| -rw-r--r-- | netfs/unittests/Jamfile.jam | 1 | ||||
| -rw-r--r-- | netfs/unittests/testCore.cpp | 51 | 
4 files changed, 92 insertions, 25 deletions
diff --git a/netfs/fuse/fuseApp.h b/netfs/fuse/fuseApp.h index 3f4d40d..9486e41 100644 --- a/netfs/fuse/fuseApp.h +++ b/netfs/fuse/fuseApp.h @@ -13,6 +13,7 @@  #include "fuseConfig.h"  #include "cache.h"  #include <visibility.h> +#include <boost/icl/interval_map.hpp>  namespace NetFS {  	class DLL_PUBLIC FuseApp : public FuseAppBase { @@ -33,13 +34,14 @@ namespace NetFS {  					OpenFile(FilePrx remote, const std::string & path, int flags);  					void flush(); -					void nonBlockingFlush();  					void wait() const;  					FilePrx remote;  					const std::string path;  					const int flags; -					std::list<Ice::AsyncResultPtr> bg; +					typedef boost::icl::interval_map<Ice::Long, Ice::AsyncResultPtr> BGs; +					static BGs::interval_type range(off_t, size_t); +					BGs bg;  					mutable boost::shared_mutex _lock;  			};  			typedef std::shared_ptr<OpenFile> OpenFilePtr; diff --git a/netfs/fuse/fuseFiles.cpp b/netfs/fuse/fuseFiles.cpp index 4c93d56..eb80531 100644 --- a/netfs/fuse/fuseFiles.cpp +++ b/netfs/fuse/fuseFiles.cpp @@ -23,7 +23,7 @@ FuseApp::OpenFile::wait() const  {  	SharedLock(_lock);  	for (const auto & w : bg) { -		w->waitForCompleted(); +		w.second->waitForCompleted();  	}  } @@ -32,29 +32,18 @@ FuseApp::OpenFile::flush()  {  	Lock(_lock);  	while (!bg.empty()) { -		auto w = bg.front(); -		bg.pop_front(); +		auto wi = bg.begin(); +		auto w = wi->second; +		bg.erase(w);  		// bg operations are void, so no need to actually get the return value  		w->throwLocalException();  	}  } -void -FuseApp::OpenFile::nonBlockingFlush() +FuseApp::OpenFile::BGs::interval_type +FuseApp::OpenFile::range(off_t o, size_t s)  { -	boost::unique_lock<boost::shared_mutex> lock(_lock, boost::try_to_lock); -	if (lock.owns_lock()) { -		for (auto wi = bg.begin(); wi != bg.end(); ) { -			auto w = *wi; -			if (w->isCompleted()) { -				wi = bg.erase(wi); -				w->throwLocalException(); -			} -			else { -				wi++; -			} -		} -	} +	return OpenFile::BGs::interval_type::right_open(o, o + s);  }  int @@ -138,15 +127,39 @@ FuseApp::write(const char *, const char * buf, size_t s, off_t o, struct fuse_fi  	try {  		auto of = getProxy<OpenFilePtr>(fi->fh);  		auto remote = of->remote; -		auto r = remote->begin_write(o, s, Buffer(buf, buf + s));  		if (fcr->Async) { -			ScopeLock(of->_lock) { -				of->bg.push_back(r); +			const auto key = OpenFile::range(s, o); +			while (true) { +				ScopeLock(of->_lock) { +					// Acquire operations to wait for +					std::vector<Ice::AsyncResultPtr> overlap; +					auto R = of->bg.equal_range(key); +					if (R.first == R.second) { +						// Begin write and store operation +						auto r = remote->begin_write(o, s, Buffer(buf, buf + s), [of,key](const Ice::AsyncResultPtr &) { +							ScopeLock(of->_lock) { +								of->bg.erase(key); +							} +						}); +						of->bg.insert({key, r}); +						return s; +					} +					else { +						// Wait for them whilst unlocked +						of->_lock.unlock(); +						overlap.reserve(std::distance(R.first, R.second)); +						for (auto i = R.first; i != R.second; i++) { +							overlap.push_back(i->second); +						} +						for (const auto & r : overlap) { +							r->waitForCompleted(); +						} +					} +				}  			} -			of->nonBlockingFlush();  		}  		else { -			r->throwLocalException(); +			remote->write(o, s, Buffer(buf, buf + s));  		}  		return s;  	} diff --git a/netfs/unittests/Jamfile.jam b/netfs/unittests/Jamfile.jam index 88fa556..ab15b8d 100644 --- a/netfs/unittests/Jamfile.jam +++ b/netfs/unittests/Jamfile.jam @@ -26,6 +26,7 @@ lib testMocks :  	<library>..//adhocutil  	<library>boost_system  	<library>boost_filesystem +	<library>../fuse//netfs-client  	<define>ROOT=\"$(me)\"  	; diff --git a/netfs/unittests/testCore.cpp b/netfs/unittests/testCore.cpp index 76b3c4c..a8431ff 100644 --- a/netfs/unittests/testCore.cpp +++ b/netfs/unittests/testCore.cpp @@ -1,5 +1,6 @@  #define BOOST_TEST_MODULE TestNetFSCore  #include <boost/test/unit_test.hpp> +#define private public  #include "mockDaemon.h"  #include "mockFuse.h"  #include <definedDirs.h> @@ -11,6 +12,8 @@ const auto testExport = UniqueExport::get(getpid());  const std::string testEndpoint("tcp -h localhost -p 12012");  const std::string testUri("tcp://localhost:12012/testvol"); +BOOST_TEST_DONT_PRINT_LOG_VALUE(NetFS::FuseApp::OpenFile::BGs::iterator); +  bool  operator==(const struct stat & a, const struct stat & b)  { @@ -482,3 +485,51 @@ BOOST_AUTO_TEST_CASE( uriConnect )  	BOOST_REQUIRE_EQUAL(0, fuse.fuse->statfs("/",  &s));  } +BOOST_AUTO_TEST_CASE( interval_map_works_how_i_think ) +{ +	// Used as proof that setting and selecting ranges given an +	// attempt to write to [offset, offset + size) will find +	// any currently executing writes that overlap that range. +	Ice::CommunicatorPtr ic = Ice::initialize({}); +	NetFS::FilePrx fp = NetFS::FilePrx::uncheckedCast(ic->stringToProxy("x:default")); + +	auto r = NetFS::FuseApp::OpenFile::range; +	NetFS::FuseApp::OpenFile::BGs map; +	auto check = [&](off_t o, size_t s, int c) { +		BOOST_TEST_CONTEXT("offset: " << o << ", size: " << s << ", count: " << c) { +			const auto ol = map.equal_range(r(o, s)); +			BOOST_TEST_CONTEXT("range: " << ol.first->first.lower() << " to " << ol.first->first.upper()) { +				BOOST_REQUIRE_EQUAL(std::distance(ol.first, ol.second), c); +			} +		} +	}; + +	BOOST_REQUIRE(map.empty()); +	// Pretend we have 3 writes in progress +	map.insert({r(5, 10), fp->begin_write(0, 0, {}) }); // [5, 15) +	map.insert({r(15, 15), fp->begin_write(0, 0, {}) }); // [15, 30) +	map.insert({r(35, 10), fp->begin_write(0, 0, {}) }); // [35, 45) +	// Then the following writes to [O, O+S) should wait on N pending writes +	check(0, 1, 0); // Clear before +	check(100, 10, 0); // Clear after +	check(0, 100, 3); // Over all +	check(0, 5, 0); // Before edge case +	check(0, 6, 1); // One byte overlap on first +	check(45, 5, 0); // After edge case +	check(44, 5, 1); // One byte overlap on last +	check(10, 15, 2); // Middle of first to middle of second +	check(10, 30, 3); // Middle of first to middle of last +	check(20, 30, 2); // Middle of first to after last +	check(20, 20, 2); // Middle of first to middle of last +	check(26, 2, 1); // Entirely within second +	check(35, 10, 1); // Match last +	check(34, 12, 1); // Surround last +	// Write completes remove +	map.erase(r(5, 20)); +	map.erase(r(10, 20)); +	map.erase(r(35, 10)); +	BOOST_REQUIRE(map.empty()); + +	ic->destroy(); +} +  | 
