summaryrefslogtreecommitdiff
path: root/lib/worker.h
blob: 0d15ca27f02b8bf42efb2fddd297476de60fdff7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#pragma once

#include <deque>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <semaphore>
#include <special_members.h>
#include <thread>
#include <utility>
#include <vector>

class Worker {
public:
	class WorkItem {
	protected:
		WorkItem(Worker * worker) : worker {worker} { }
		virtual ~WorkItem() = default;
		NO_MOVE(WorkItem);
		NO_COPY(WorkItem);

		void
		assist() const
		{
			worker->assist();
		}
		Worker * worker;

	public:
		virtual void doWork() = 0;
	};

	template<typename T> class WorkItemT : public WorkItem {
	public:
		T
		get()
		{
			using namespace std::chrono_literals;
			while (future.wait_for(0s) == std::future_status::timeout) {
				assist();
			}
			return future.get();
		}

	protected:
		WorkItemT(Worker * worker) : WorkItem {worker} { }

		std::promise<T> promise;
		std::future<T> future {promise.get_future()};
		friend Worker;
	};

	template<typename... Params>
	static auto
	addWork(Params &&... params)
	{
		return instance.addWorkImpl(std::forward<Params>(params)...);
	}
	template<typename T> using WorkPtrT = std::shared_ptr<WorkItemT<T>>;

private:
	template<typename T, typename... Params> class WorkItemTImpl : public WorkItemT<T> {
	public:
		WorkItemTImpl(Worker * worker, Params &&... params) :
			WorkItemT<T> {worker}, params {std::forward<Params>(params)...}
		{
		}

	private:
		void
		doWork() override
		{
			try {
				if constexpr (std::is_void_v<T>) {
					std::apply(std::invoke<Params &...>, params);
					WorkItemT<T>::promise.set_value();
				}
				else {
					WorkItemT<T>::promise.set_value(std::apply(std::invoke<Params &...>, params));
				}
			}
			catch (...) {
				WorkItemT<T>::promise.set_exception(std::current_exception());
			}
		}

		std::tuple<Params...> params;
	};

	Worker();
	~Worker();

	NO_COPY(Worker);
	NO_MOVE(Worker);

	using WorkPtr = std::shared_ptr<WorkItem>;

	template<typename... Params>
	auto
	addWorkImpl(Params &&... params)
	{
		using T = decltype(std::invoke(std::forward<Params>(params)...));
		auto work = std::make_shared<WorkItemTImpl<T, Params...>>(this, std::forward<Params>(params)...);
		addWorkPtr(work);
		return work;
	}

	void addWorkPtr(WorkPtr w);
	void worker();
	void assist();

	using Threads = std::vector<std::jthread>;
	using ToDo = std::deque<WorkPtr>;

	ToDo todo;
	std::counting_semaphore<16> todoLen;
	std::mutex todoMutex;
	Threads threads;

	static Worker instance;
};