-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreadpool.h
148 lines (125 loc) · 5.2 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <vector>
#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <unordered_set>
struct TaskProfile {
enum TTiming {
kImmediate = 0,
kAfter,
kPeriodic,
};
TaskProfile(TTiming _timing, int _serial_tag, int _after, int _period);
static uint64_t __MakeSeq() {
static uint64_t seq = 0;
return ++seq;
}
TTiming type;
int serial_tag;
int after;
int period;
uint64_t record; // for kAfter it's creating ts; for kPeriodic it's last running ts.
uint64_t seq;
static const uint64_t kInvalidSeq = 0;
};
class ThreadPool {
public:
void operator=(ThreadPool const &) = delete;
ThreadPool(ThreadPool const &) = delete;
~ThreadPool();
static ThreadPool &Instance() {
static ThreadPool instance;
return instance;
}
/**
* Initializes ThreadPool singleton in advance,
* instead of lazy loading when first used.
*/
void Init();
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type>
Execute(F&& _f, Args&&... _args) {
return __AddTask(TaskProfile::TTiming::kImmediate, kNoSerialTag, 0, 0, _f, _args...);
}
/**
* @param _serial_tag: tasks with the same _serial_tag(>0) execute serially.
*/
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type>
Execute(int _serial_tag, F&& _f, Args&&... _args) {
return __AddTask(TaskProfile::TTiming::kImmediate, _serial_tag, 0, 0, _f, _args...);
}
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type>
ExecuteAfter(int _after_millis, F&& _f, Args&&... _args) {
return __AddTask(TaskProfile::TTiming::kAfter, kNoSerialTag, _after_millis, 0, _f, _args...);
}
template<class F, class... Args>
void ExecutePeriodic(int _period_millis, F&& _f, Args&&... _args) {
{
LockGuard lock(mutex_);
tasks_.push_back(new std::pair<TaskProfile, std::function<void()>>(
TaskProfile(TaskProfile::TTiming::kPeriodic, kNoSerialTag,
0, _period_millis), [=] { _f(_args...); }));
}
cv_.notify_one();
}
private:
using UniqueLock = std::unique_lock<std::mutex>;
using LockGuard = std::unique_lock<std::mutex>;
using TaskPairPtr = std::pair<TaskProfile, std::function<void()>> *;
explicit ThreadPool(size_t _n_threads = 4);
/**
*
* A task is considered Faster than another,
* only if the task meets either of the following conditions:
* 1. The task is kImmediate and the other is not, or
* 2. The task is kAfter or kPeriodic and expires earlier than the other.
*
* If a faster task is found, it will be picked out from the task queue,
* while the {@param _old} will be push back to the queue if it is not NULL.
*
* @param _old: Such task will be compared to the others in the task queue.
* If it is NULL, any task is faster than the given {@param _old}.
* @return: if {@param _old} is kImmediate, return NULL, because no task is faster than a kImmediate one,
* else return pointer of the kImmediate task if exists,
* else return the pointer of the task with the minimum time to wait
* until its (next) execution if exists,
* else return NULL, indicating that there is no task faster.
*/
TaskPairPtr __PickOutTaskFasterThan(TaskPairPtr _old = nullptr);
void __CreateWorkerThread();
/**
* @param _now: if not given, it will be updated inside the function.
*/
uint64_t __ComputeWaitTime(TaskProfile *_profile, uint64_t _now = 0);
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type>
__AddTask(TaskProfile::TTiming _timing, int _serial_tag, int _after, int _period, F&& _f, Args&&... _args) {
using return_t = typename std::result_of<F(Args...)>::type;
using pack_task_t = std::packaged_task<return_t(void)>;
auto task = std::make_shared<pack_task_t>(std::bind(_f, _args...));
std::future<return_t> ret = task->get_future();
{
LockGuard lock(mutex_);
tasks_.push_back(new std::pair<TaskProfile, std::function<void()>>(
TaskProfile(_timing, _serial_tag, _after, _period), [=] { (*task)(); }));
}
cv_.notify_one();
return ret;
}
private:
std::list<std::pair<TaskProfile, std::function<void()>>*> tasks_;
std::vector<std::thread> workers_;
std::unordered_set<int> running_serial_tags_;
const static int kNoSerialTag;
std::mutex mutex_;
std::condition_variable cv_;
bool stop_;
};
#endif //THREADPOOL_H