diff options
-rw-r--r-- | lib/worker.cpp | 21 | ||||
-rw-r--r-- | lib/worker.h | 26 | ||||
-rw-r--r-- | test/test-worker.cpp | 21 |
3 files changed, 64 insertions, 4 deletions
diff --git a/lib/worker.cpp b/lib/worker.cpp index 7e7f296..45fb6df 100644 --- a/lib/worker.cpp +++ b/lib/worker.cpp @@ -42,3 +42,24 @@ Worker::worker() j->doWork(); } } + +void +Worker::assist() +{ + auto job = [this]() { + using namespace std::chrono_literals; + if (todoLen.try_acquire_for(100us)) { + if (std::lock_guard<std::mutex> lck {todoMutex}; todo.size()) { + WorkPtr x = std::move(todo.front()); + if (x) { + todo.pop_front(); + } + return x; + } + } + return WorkPtr {}; + }; + if (auto j = job()) { + j->doWork(); + } +} diff --git a/lib/worker.h b/lib/worker.h index 1bc7c14..d9a5a6f 100644 --- a/lib/worker.h +++ b/lib/worker.h @@ -14,12 +14,20 @@ class Worker { public: class WorkItem { - public: - WorkItem() = default; + protected: + WorkItem(Worker * worker) : worker {worker} { } virtual ~WorkItem() = default; NO_MOVE(WorkItem); NO_COPY(WorkItem); + void + assist() const + { + worker->assist(); + } + Worker * worker; + + public: virtual void doWork() = 0; }; @@ -28,10 +36,16 @@ public: T get() { + using namespace std::chrono_literals; + while (future.wait_for(0s) == std::future_status::timeout) { + assist(); + } return future.get(); } protected: + WorkItemT(Worker * worker) : WorkItem {worker} { } + std::promise<T> promise; std::future<T> future {promise.get_future()}; friend Worker; @@ -48,7 +62,10 @@ public: private: template<typename T, typename... Params> class WorkItemTImpl : public WorkItemT<T> { public: - WorkItemTImpl(Params &&... params) : params {std::forward<Params>(params)...} { } + WorkItemTImpl(Worker * worker, Params &&... params) : + WorkItemT<T> {worker}, params {std::forward<Params>(params)...} + { + } private: void @@ -84,13 +101,14 @@ private: addWorkImpl(Params &&... params) { using T = decltype(std::invoke(std::forward<Params>(params)...)); - auto work = std::make_shared<WorkItemTImpl<T, Params...>>(std::forward<Params>(params)...); + auto work = std::make_shared<WorkItemTImpl<T, Params...>>(this, std::forward<Params>(params)...); addWorkPtr(work); return work; } void addWorkPtr(WorkPtr w); void worker(); + void assist(); using Threads = std::vector<std::jthread>; using ToDo = std::deque<WorkPtr>; diff --git a/test/test-worker.cpp b/test/test-worker.cpp index 319261b..76b7138 100644 --- a/test/test-worker.cpp +++ b/test/test-worker.cpp @@ -81,3 +81,24 @@ BOOST_AUTO_TEST_CASE(lambda_value) 1, 2) ->get()); } + +BOOST_AUTO_TEST_CASE(recursive, *boost::unit_test::timeout(5)) +{ + auto recurse = []() { + std::vector<Worker::WorkPtrT<uint32_t>> ps; + for (int i {}; i < 30; ++i) { + ps.push_back(Worker::addWork(workCounter)); + } + return std::accumulate(ps.begin(), ps.end(), 0U, [](auto && out, auto && p) { + return out += p->get(); + }); + }; + std::vector<Worker::WorkPtrT<uint32_t>> ps; + for (int i {}; i < 30; ++i) { + ps.push_back(Worker::addWork(recurse)); + } + std::set<uint32_t> out; + std::transform(ps.begin(), ps.end(), std::inserter(out, out.end()), [](auto && p) { + return p->get(); + }); +} |