-
Notifications
You must be signed in to change notification settings - Fork 1
/
workerpool.go
167 lines (150 loc) · 3.89 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package gpool
import (
"log"
"sync"
"time"
)
// WorkerPool define a worker pool
type WorkerPool struct {
maxWorkerNumber int //the max worker number in the pool
workerNumber int //the worker number now in the pool
workers []*Worker //the available worker queue
lock sync.Mutex //for queue thread-safe
maxIdleTime time.Duration //the recycle time. That means goroutine will be destroyed when it has not been used for maxIdleTime.
stop chan struct{} //stop
stopFlag bool //trick
objectPool *sync.Pool //gc-friendly
}
//Worker run as a goroutine
type Worker struct {
fn chan func()
lastUsedTime int64
}
// NewLimt creates a worker pool
//
// maxWorkerNum define the max worker number in the pool. When the worker
// number exceeds maxWorkerNum, we will ignore the job.
// recycleTime(minute) define the time to recycle goroutine. When a goroutine has
// not been used for recycleTime, it will be recycled.
func NewLimit(maxWorkerNum, recycleTime int) (*WorkerPool, error) {
wp := &WorkerPool{
maxWorkerNumber: maxWorkerNum,
maxIdleTime: time.Duration(recycleTime) * time.Minute,
objectPool: &sync.Pool{
New: func() interface{} {
return &Worker{
fn: make(chan func()),
}
},
},
}
wp.init()
return wp, nil
}
// NewUnlimit creates a unlimited-number worker pool.
func NewUnlimit(recycleTime int) (*WorkerPool, error) {
wp := &WorkerPool{
maxWorkerNumber: -1,
maxIdleTime: time.Duration(recycleTime) * time.Minute,
}
wp.init()
return wp, nil
}
// init initializes the workerpool.
//
// init func will be in charge of cleaning up goroutines and receiving the stop signal.
func (wp *WorkerPool) init() {
go func() {
tick := time.Tick(wp.maxIdleTime)
for {
select {
case <-tick:
wp.cleanup()
case <-wp.stop:
wp.stopPool()
break
}
}
}()
}
// Stop stop goroutine pool
func (wp *WorkerPool) Stop() {
wp.stop <- struct{}{}
}
// cleanup cleans up the available worker queue.
func (wp *WorkerPool) cleanup() {
i := 0
now := time.Now().Unix()
for i = 0; i < len(wp.workers); i++ {
if time.Duration(now-wp.workers[i].lastUsedTime)*time.Second < wp.maxIdleTime {
break
} else {
close(wp.workers[i].fn)
}
}
wp.lock.Lock()
wp.workers = wp.workers[i:]
wp.lock.Unlock()
}
// stopPool stops the worker pool.
func (wp *WorkerPool) stopPool() {
wp.stopFlag = true
wp.lock.Lock()
for _, w := range wp.workers {
w.fn <- nil
}
wp.lock.Unlock()
}
// Queue assigns a worker for job (fn func(), with closure we can define every job in this form)
//
// If the worker pool is limited-number and the worker number has reached the limit, we prefer to discard the job.
func (wp *WorkerPool) Queue(fn func()) {
worker := wp.getWorker()
if worker == nil {
log.Print("get worker Failed")
return
}
worker.fn <- fn
}
// GetWorker select a worker.
//
// If the available worker queue is empty, we will new a worker.
// else we will select the last worker, in this case, the worker queue
// is like a FILO queue, and the select algorithm is kind of like LRU.
func (wp *WorkerPool) getWorker() *Worker {
if len(wp.workers) == 0 {
wp.workerNumber++
if wp.maxWorkerNumber != -1 && wp.workerNumber > wp.maxWorkerNumber {
//log
log.Println("worker number excess max")
return nil
}
worker := &Worker{
fn: make(chan func()),
}
go wp.startWorker(worker)
return worker
}
wp.lock.Lock()
worker := wp.workers[len(wp.workers)-1]
wp.workers[len(wp.workers)-1] = nil
wp.workers = wp.workers[:len(wp.workers)-1]
wp.lock.Unlock()
return worker
}
// StartWorker starts a new goroutine.
func (wp *WorkerPool) startWorker(worker *Worker) {
for f := range worker.fn {
if f == nil {
break
}
f()
if wp.stopFlag == true {
break
}
worker.lastUsedTime = time.Now().Unix()
wp.lock.Lock()
wp.workers = append(wp.workers, worker)
wp.lock.Unlock()
}
}