diff options
author | Dan Goodliffe <dan@randomdan.homeip.net> | 2015-11-17 22:24:56 +0000 |
---|---|---|
committer | Dan Goodliffe <dan@randomdan.homeip.net> | 2015-11-17 22:24:56 +0000 |
commit | 66a29f972bf062b4345bac56a8161032638580e0 (patch) | |
tree | 162b51cbda4f36050ae1a2b7a836f760e5c87719 | |
parent | Add a portable semaphore class with timeout support (diff) | |
download | libadhocutil-66a29f972bf062b4345bac56a8161032638580e0.tar.bz2 libadhocutil-66a29f972bf062b4345bac56a8161032638580e0.tar.xz libadhocutil-66a29f972bf062b4345bac56a8161032638580e0.zip |
Add a template resource pool with limits and caching support.
-rw-r--r-- | libadhocutil/resourcePool.cpp | 28 | ||||
-rw-r--r-- | libadhocutil/resourcePool.h | 126 | ||||
-rw-r--r-- | libadhocutil/resourcePool.impl.h | 238 | ||||
-rw-r--r-- | libadhocutil/unittests/Jamfile.jam | 12 | ||||
-rw-r--r-- | libadhocutil/unittests/testResourcePool.cpp | 242 |
5 files changed, 646 insertions, 0 deletions
diff --git a/libadhocutil/resourcePool.cpp b/libadhocutil/resourcePool.cpp new file mode 100644 index 0000000..29e6cb9 --- /dev/null +++ b/libadhocutil/resourcePool.cpp @@ -0,0 +1,28 @@ +#include "resourcePool.h" +#include "buffer.h" + +namespace AdHoc { + TimeOutOnResourcePool::TimeOutOnResourcePool(const char * const n) : + name(n) + { + } + + std::string + TimeOutOnResourcePool::message() const throw() + { + return stringbf("Timeout getting a resource from pool of %s", name); + } + + NoCurrentResource::NoCurrentResource(const std::thread::id & id, const char * const n) : + threadId(id), + name(n) + { + } + + std::string + NoCurrentResource::message() const throw() + { + return stringbf("Thread %s has no current resource handle of type %s", threadId, name); + } +} + diff --git a/libadhocutil/resourcePool.h b/libadhocutil/resourcePool.h new file mode 100644 index 0000000..70d9ae2 --- /dev/null +++ b/libadhocutil/resourcePool.h @@ -0,0 +1,126 @@ +#ifndef ADHOCUTIL_RESOURCEPOOL_H +#define ADHOCUTIL_RESOURCEPOOL_H + +#include <boost/tuple/tuple.hpp> +#include <boost/thread/shared_mutex.hpp> +#include <atomic> +#include <thread> +#include <list> +#include <map> +#include "semaphore.h" +#include "exception.h" +#include "visibility.h" + +namespace AdHoc { + template <typename Resource> + class ResourcePool; + + /// A handle to a resource allocated from a ResourcePool. + template <typename Resource> + class DLL_PUBLIC ResourceHandle { + public: + /// Handle to an allocated resource, the pool it belongs to and a count of active references. + typedef boost::tuple<Resource *, ResourcePool<Resource> *, std::atomic<unsigned int>> Object; + + /// Create a reference to a new resource. + ResourceHandle(Object *); + ResourceHandle(const ResourceHandle &); + ~ResourceHandle(); + + void operator=(const ResourceHandle &); + Resource * operator->() const; + Resource * get() const; + + unsigned int handleCount() const; + + private: + DLL_PRIVATE void incRef() const; + DLL_PRIVATE void decRef(); + Object * resource; + }; + + /// A fully featured resource pool for sharing and reusing a finite set of + /// resources, possibly across multiple threads. + template <typename Resource> + class DLL_PUBLIC ResourcePool { + public: + friend class ResourceHandle<Resource>; + + ResourcePool(unsigned int maxSize, unsigned int keep); + virtual ~ResourcePool(); + + ResourceHandle<Resource> get(); + ResourceHandle<Resource> get(unsigned int); + ResourceHandle<Resource> getMine(); + void idle(); + + unsigned int inUseCount() const; + unsigned int availableCount() const; + + protected: + virtual Resource * createResource() const = 0; + virtual void destroyResource(Resource *) const; + virtual void testResource(const Resource *) const; + + private: + typedef std::list<Resource *> Available; + typedef std::multimap<std::thread::id, typename ResourceHandle<Resource>::Object *> InUse; + + void putBack(Resource *); + void discard(Resource *); + + DLL_PRIVATE static void removeFrom(Resource *, InUse &); + DLL_PRIVATE ResourceHandle<Resource> getOne(); + + mutable boost::upgrade_mutex lock; + Semaphore poolSize; + unsigned int keep; + Available available; + InUse inUse; + }; + + /// Represents a failure to acquire a new resource within the given timeout. + class DLL_PUBLIC TimeOutOnResourcePool : public AdHoc::StdException { + public: + TimeOutOnResourcePool(const char * const type); + + std::string message() const throw() override; + + private: + const char * const name; + }; + + /// Represents a failure to acquire a new resource of type R within the given timeout. + template <typename R> + class DLL_PUBLIC TimeOutOnResourcePoolT : public TimeOutOnResourcePool { + public: + TimeOutOnResourcePoolT(); + }; + + /// Represents a request for the current thread's previous allocated resource + /// when one has not been allocated. + class DLL_PUBLIC NoCurrentResource : public AdHoc::StdException { + public: + /// Construct for a specific thread and resource type. + NoCurrentResource(const std::thread::id &, const char * const type); + + std::string message() const throw() override; + + private: + const std::thread::id threadId; + const char * const name; + }; + + /// Represents a request for the current thread's previous allocated resource + /// of type R when one has not been allocated. + template <typename R> + class DLL_PUBLIC NoCurrentResourceT : public NoCurrentResource { + public: + /// Construct for a specific thread and resource type R. + NoCurrentResourceT(const std::thread::id &); + }; + +} + +#endif + diff --git a/libadhocutil/resourcePool.impl.h b/libadhocutil/resourcePool.impl.h new file mode 100644 index 0000000..d4fcfe4 --- /dev/null +++ b/libadhocutil/resourcePool.impl.h @@ -0,0 +1,238 @@ +#ifndef ADHOCUTIL_RESOURCEPOOL_IMPL_H +#define ADHOCUTIL_RESOURCEPOOL_IMPL_H + +#include "resourcePool.h" +#include "lockHelpers.h" +#include "safeMapFind.h" + +namespace AdHoc { + // + // ResourceHandle + // + + template <typename R> + ResourceHandle<R>::ResourceHandle(Object * o) : + resource(o) + { + incRef(); + } + + template <typename R> + ResourceHandle<R>::ResourceHandle(const ResourceHandle & rh) : + resource(rh.resource) + { + incRef(); + } + + template <typename R> + ResourceHandle<R>::~ResourceHandle() + { + decRef(); + } + + template <typename R> + unsigned int + ResourceHandle<R>::handleCount() const + { + return boost::get<2>(*resource); + } + + template <typename R> + R * + ResourceHandle<R>::get() const + { + return boost::get<0>(*resource); + } + + template <typename R> + R * + ResourceHandle<R>::operator->() const + { + return boost::get<0>(*resource); + } + + template <typename R> + void + ResourceHandle<R>::incRef() const + { + ++boost::get<2>(*resource); + } + + template <typename R> + void + ResourceHandle<R>::decRef() + { + if (!--boost::get<2>(*resource)) { + if (std::uncaught_exception()) { + boost::get<1>(*resource)->discard(boost::get<0>(*resource)); + } + else { + boost::get<1>(*resource)->putBack(boost::get<0>(*resource)); + } + delete resource; + } + resource = nullptr; + } + + // + // ResourcePool + // + + template <typename R> + ResourcePool<R>::ResourcePool(unsigned int max, unsigned int k) : + poolSize(max), + keep(k) + { + } + + template <typename R> + ResourcePool<R>::~ResourcePool() + { + for (auto & r : available) { + destroyResource(r); + } + for (auto & r : inUse) { + destroyResource(boost::get<0>(*r.second)); + } + } + + template <typename R> + void + ResourcePool<R>::destroyResource(R * r) const + { + delete r; + } + + template <typename R> + void + ResourcePool<R>::testResource(const R *) const + { + } + + template <typename R> + unsigned int + ResourcePool<R>::inUseCount() const + { + SharedLock(lock); + return inUse.size(); + } + + template <typename R> + unsigned int + ResourcePool<R>::availableCount() const + { + SharedLock(lock); + return available.size(); + } + + template <typename R> + ResourceHandle<R> + ResourcePool<R>::getMine() + { + Lock(lock); + return safeMapLookup<NoCurrentResourceT<R>>(inUse, std::this_thread::get_id()); + } + + template <typename R> + void + ResourcePool<R>::idle() + { + Lock(lock); + for (auto & r : available) { + destroyResource(r); + } + available.clear(); + } + + template <typename R> + ResourceHandle<R> + ResourcePool<R>::get() + { + poolSize.wait(); + return getOne(); + } + + template <typename R> + ResourceHandle<R> + ResourcePool<R>::get(unsigned int timeout) + { + if (!poolSize.wait(timeout)) { + throw TimeOutOnResourcePoolT<R>(); + } + return getOne(); + } + + template <typename R> + ResourceHandle<R> + ResourcePool<R>::getOne() + { + UpgradableLock(lock, ulock); + if (available.empty()) { + auto ro = new typename ResourceHandle<R>::Object(createResource(), this); + UpgradeLock(ulock); + inUse.insert({ std::this_thread::get_id(), ro }); + return ro; + } + else { + UpgradeLock(ulock); + auto ro = new typename ResourceHandle<R>::Object(available.front(), this); + available.pop_front(); + inUse.insert({ std::this_thread::get_id(), ro }); + return ro; + } + } + + template <typename R> + void + ResourcePool<R>::putBack(R * r) + { + Lock(lock); + removeFrom(r, inUse); + if (available.size() < keep) { + available.push_back(r); + } + else { + destroyResource(r); + } + poolSize.notify(); + } + + template <typename R> + void + ResourcePool<R>::discard(R * r) + { + Lock(lock); + removeFrom(r, inUse); + destroyResource(r); + poolSize.notify(); + } + + template <typename R> + void + ResourcePool<R>::removeFrom(R * r, InUse & inUse) + { + auto rs = inUse.equal_range(std::this_thread::get_id()); + for (auto & ri = rs.first; ri != rs.second; ri++) { + if (boost::get<0>(*ri->second) == r) { + inUse.erase(ri); + return; + } + } + } + + template <typename R> + TimeOutOnResourcePoolT<R>::TimeOutOnResourcePoolT() : + TimeOutOnResourcePool(typeid(R).name()) + { + } + + template <typename R> + NoCurrentResourceT<R>::NoCurrentResourceT(const std::thread::id & id) : + NoCurrentResource(id, typeid(R).name()) + { + } + +} + +#endif + diff --git a/libadhocutil/unittests/Jamfile.jam b/libadhocutil/unittests/Jamfile.jam index 334b47e..582b174 100644 --- a/libadhocutil/unittests/Jamfile.jam +++ b/libadhocutil/unittests/Jamfile.jam @@ -185,3 +185,15 @@ run testException ; +run + testResourcePool.cpp + : : : + <define>BOOST_TEST_DYN_LINK + <library>..//adhocutil + <library>boost_utf + <library>boost_thread + <library>boost_system + : + testResourcePool + ; + diff --git a/libadhocutil/unittests/testResourcePool.cpp b/libadhocutil/unittests/testResourcePool.cpp new file mode 100644 index 0000000..8b4161e --- /dev/null +++ b/libadhocutil/unittests/testResourcePool.cpp @@ -0,0 +1,242 @@ +#define BOOST_TEST_MODULE ResourcePool +#include <boost/test/unit_test.hpp> + +#include <resourcePool.impl.h> + +class MockResource { + public: + MockResource() { count += 1; } + ~MockResource() { count -= 1; } + + MockResource(const MockResource &) = delete; + void operator=(const MockResource &) = delete; + + static std::atomic<unsigned int> count; +}; + +std::atomic<unsigned int> MockResource::count; + +class TRP : public AdHoc::ResourcePool<MockResource> { + public: + TRP() : AdHoc::ResourcePool<MockResource>(10, 10) { } + protected: + MockResource * createResource() const override + { + return new MockResource(); + } +}; + +class TRPSmall : public AdHoc::ResourcePool<MockResource> { + public: + TRPSmall() : AdHoc::ResourcePool<MockResource>(3, 1) { } + protected: + MockResource * createResource() const override + { + return new MockResource(); + } +}; + +BOOST_AUTO_TEST_CASE ( get ) +{ + { + TRP pool; + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + + { + auto r1 = pool.get(); + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, r1.handleCount()); + BOOST_REQUIRE_EQUAL(1, MockResource::count); + BOOST_REQUIRE(r1.get()); + + auto r2(pool.get()); + BOOST_REQUIRE_EQUAL(2, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, r2.handleCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + BOOST_REQUIRE(r2.get()); + + auto r1a = r1; + BOOST_REQUIRE_EQUAL(2, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(2, r1.handleCount()); + BOOST_REQUIRE_EQUAL(2, r1a.handleCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + BOOST_REQUIRE(r1.get()); + BOOST_REQUIRE(r1a.get()); + BOOST_REQUIRE_EQUAL(r1.get(), r1a.get()); + } + + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(2, pool.availableCount()); + } + BOOST_REQUIRE_EQUAL(0, MockResource::count); +} + +BOOST_AUTO_TEST_CASE ( getMine ) +{ + TRP pool; + auto r1 = pool.get(); + BOOST_REQUIRE(r1.get()); + auto r2 = pool.getMine(); + BOOST_REQUIRE_EQUAL(r1.get(), r2.get()); + BOOST_REQUIRE_EQUAL(2, r1.handleCount()); + BOOST_REQUIRE_EQUAL(2, r2.handleCount()); +} + +BOOST_AUTO_TEST_CASE( getMineNoCurrent ) +{ + TRP pool; + BOOST_REQUIRE_THROW(pool.getMine(), AdHoc::NoCurrentResourceT<MockResource>); + { + auto r1 = pool.get(); + auto r2 = pool.getMine(); + BOOST_REQUIRE_EQUAL(r1.get(), r2.get()); + } + BOOST_REQUIRE_THROW(pool.getMine(), AdHoc::NoCurrentResource); +} + +BOOST_AUTO_TEST_CASE( discard ) +{ + TRP pool; + try { + auto r1 = pool.get(); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + throw std::exception(); + } + catch (...) { + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + } +} + +BOOST_AUTO_TEST_CASE( keepSome1 ) +{ + TRPSmall pool; + { + auto r1 = pool.get(); + { + auto r2 = pool.get(); + { + auto r3 = pool.get(); + BOOST_REQUIRE_EQUAL(3, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(3, MockResource::count); + } + BOOST_REQUIRE_EQUAL(2, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(3, MockResource::count); + } + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + { + auto r2 = pool.get(); + BOOST_REQUIRE_EQUAL(2, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + } + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + } + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, MockResource::count); +} + +BOOST_AUTO_TEST_CASE( keepSome2 ) +{ + TRPSmall pool; + { + auto r1 = pool.get(); + auto r2 = pool.get(); + auto r3 = pool.get(); + BOOST_REQUIRE_EQUAL(3, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(3, MockResource::count); + } + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, MockResource::count); +} + +BOOST_AUTO_TEST_CASE( idle ) +{ + TRP pool; + { + { + auto r1 = pool.get(); + auto r2 = pool.get(); + } + auto r3 = pool.get(); + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(2, MockResource::count); + pool.idle(); + BOOST_REQUIRE_EQUAL(1, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, MockResource::count); + } + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); + BOOST_REQUIRE_EQUAL(1, MockResource::count); + pool.idle(); + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(0, pool.availableCount()); + BOOST_REQUIRE_EQUAL(0, MockResource::count); +} + +BOOST_AUTO_TEST_CASE( threading1 ) +{ + TRPSmall pool; + std::list<std::thread *> threads; + for (int x = 0; x < 100; x += 1) { + threads.push_back(new std::thread([&pool](){ + auto r = pool.get(); + usleep(50000); + })); + usleep(5000); + // pool size never exceeds 3 + BOOST_REQUIRE(pool.inUseCount() <= 3); + } + for(std::thread * thread : threads) { + thread->join(); + delete thread; + } + // pool keep returns to 1 + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); +} + +static +void +acquireAndKeepFor1Second(TRPSmall * pool) +{ + auto r = pool->get(); + sleep(1); +} + +BOOST_AUTO_TEST_CASE( threading2 ) +{ + TRPSmall pool; + std::thread t1([&pool]() { acquireAndKeepFor1Second(&pool); }); + std::thread t2([&pool]() { acquireAndKeepFor1Second(&pool); }); + std::thread t3([&pool]() { acquireAndKeepFor1Second(&pool); }); + + BOOST_REQUIRE_THROW(pool.get(100), AdHoc::TimeOutOnResourcePoolT<MockResource>); + BOOST_REQUIRE_EQUAL(3, pool.inUseCount()); + + t1.join(); + { + auto r = pool.get(0); + t2.join(); + t3.join(); + } + + BOOST_REQUIRE_EQUAL(0, pool.inUseCount()); + BOOST_REQUIRE_EQUAL(1, pool.availableCount()); +} + |