summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/worker.cpp21
-rw-r--r--lib/worker.h26
-rw-r--r--test/test-worker.cpp21
3 files changed, 64 insertions, 4 deletions
diff --git a/lib/worker.cpp b/lib/worker.cpp
index 7e7f296..45fb6df 100644
--- a/lib/worker.cpp
+++ b/lib/worker.cpp
@@ -42,3 +42,24 @@ Worker::worker()
j->doWork();
}
}
+
+void
+Worker::assist()
+{
+ auto job = [this]() {
+ using namespace std::chrono_literals;
+ if (todoLen.try_acquire_for(100us)) {
+ if (std::lock_guard<std::mutex> lck {todoMutex}; todo.size()) {
+ WorkPtr x = std::move(todo.front());
+ if (x) {
+ todo.pop_front();
+ }
+ return x;
+ }
+ }
+ return WorkPtr {};
+ };
+ if (auto j = job()) {
+ j->doWork();
+ }
+}
diff --git a/lib/worker.h b/lib/worker.h
index 1bc7c14..d9a5a6f 100644
--- a/lib/worker.h
+++ b/lib/worker.h
@@ -14,12 +14,20 @@
class Worker {
public:
class WorkItem {
- public:
- WorkItem() = default;
+ protected:
+ WorkItem(Worker * worker) : worker {worker} { }
virtual ~WorkItem() = default;
NO_MOVE(WorkItem);
NO_COPY(WorkItem);
+ void
+ assist() const
+ {
+ worker->assist();
+ }
+ Worker * worker;
+
+ public:
virtual void doWork() = 0;
};
@@ -28,10 +36,16 @@ public:
T
get()
{
+ using namespace std::chrono_literals;
+ while (future.wait_for(0s) == std::future_status::timeout) {
+ assist();
+ }
return future.get();
}
protected:
+ WorkItemT(Worker * worker) : WorkItem {worker} { }
+
std::promise<T> promise;
std::future<T> future {promise.get_future()};
friend Worker;
@@ -48,7 +62,10 @@ public:
private:
template<typename T, typename... Params> class WorkItemTImpl : public WorkItemT<T> {
public:
- WorkItemTImpl(Params &&... params) : params {std::forward<Params>(params)...} { }
+ WorkItemTImpl(Worker * worker, Params &&... params) :
+ WorkItemT<T> {worker}, params {std::forward<Params>(params)...}
+ {
+ }
private:
void
@@ -84,13 +101,14 @@ private:
addWorkImpl(Params &&... params)
{
using T = decltype(std::invoke(std::forward<Params>(params)...));
- auto work = std::make_shared<WorkItemTImpl<T, Params...>>(std::forward<Params>(params)...);
+ auto work = std::make_shared<WorkItemTImpl<T, Params...>>(this, std::forward<Params>(params)...);
addWorkPtr(work);
return work;
}
void addWorkPtr(WorkPtr w);
void worker();
+ void assist();
using Threads = std::vector<std::jthread>;
using ToDo = std::deque<WorkPtr>;
diff --git a/test/test-worker.cpp b/test/test-worker.cpp
index 319261b..76b7138 100644
--- a/test/test-worker.cpp
+++ b/test/test-worker.cpp
@@ -81,3 +81,24 @@ BOOST_AUTO_TEST_CASE(lambda_value)
1, 2)
->get());
}
+
+BOOST_AUTO_TEST_CASE(recursive, *boost::unit_test::timeout(5))
+{
+ auto recurse = []() {
+ std::vector<Worker::WorkPtrT<uint32_t>> ps;
+ for (int i {}; i < 30; ++i) {
+ ps.push_back(Worker::addWork(workCounter));
+ }
+ return std::accumulate(ps.begin(), ps.end(), 0U, [](auto && out, auto && p) {
+ return out += p->get();
+ });
+ };
+ std::vector<Worker::WorkPtrT<uint32_t>> ps;
+ for (int i {}; i < 30; ++i) {
+ ps.push_back(Worker::addWork(recurse));
+ }
+ std::set<uint32_t> out;
+ std::transform(ps.begin(), ps.end(), std::inserter(out, out.end()), [](auto && p) {
+ return p->get();
+ });
+}