-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathroutine.go
186 lines (173 loc) · 4.22 KB
/
routine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package keyed
import (
"context"
"time"
"github.com/aperturerobotics/util/backoff"
cbackoff "github.com/aperturerobotics/util/backoff/cbackoff"
)
// runningRoutine tracks a running routine
type runningRoutine[K comparable, V any] struct {
// k is the keyed instance
k *Keyed[K, V]
// key is the key for this routine
key K
// fields guarded by k.mtx
// ctx is the context
ctx context.Context
// ctxCancel cancels the context
// if nil, not running
ctxCancel context.CancelFunc
// exitedCh is closed when the routine running with ctx exits
// may be nil if ctx == nil
exitedCh <-chan struct{}
// routine is the routine callback
routine Routine
// data is the associated routine data
data V
// err is the error if any
err error
// success indicates the routine succeeded
success bool
// exited indicates the routine exited
exited bool
// deferRemove is set if we are waiting to remove this.
deferRemove *time.Timer
// retryBo is the retry backoff if retrying is enabled.
retryBo cbackoff.BackOff
// deferRetry is set if we are waiting to retry this.
deferRetry *time.Timer
}
// newRunningRoutine constructs a new runningRoutine
func newRunningRoutine[K comparable, V any](
k *Keyed[K, V],
key K,
routine Routine,
data V,
backoffFactory func(k K) cbackoff.BackOff,
) *runningRoutine[K, V] {
var backoff cbackoff.BackOff
if backoffFactory != nil {
backoff = backoffFactory(key)
}
return &runningRoutine[K, V]{
k: k,
key: key,
routine: routine,
data: data,
retryBo: backoff,
}
}
// start starts or restarts the routine (if not running).
// expects k.mtx to be locked by caller
// if waitCh != nil, waits for waitCh to be closed before fully starting.
// if forceRestart is set, cancels the existing routine.
func (r *runningRoutine[K, V]) start(ctx context.Context, waitCh <-chan struct{}, forceRestart bool) {
if (!forceRestart && r.success) || r.routine == nil {
return
}
if !forceRestart && r.ctx != nil && !r.exited && r.ctx.Err() == nil {
// routine is still running
return
}
if r.deferRetry != nil {
// cancel retrying this key
_ = r.deferRetry.Stop()
r.deferRetry = nil
}
if r.ctxCancel != nil {
r.ctxCancel()
}
exitedCh := make(chan struct{})
r.err = nil
r.success, r.exited = false, false
r.exitedCh = exitedCh
r.ctx, r.ctxCancel = context.WithCancel(ctx)
go r.execute(r.ctx, r.ctxCancel, exitedCh, waitCh)
}
// execute executes the routine.
func (r *runningRoutine[K, V]) execute(
ctx context.Context,
cancel context.CancelFunc,
exitedCh chan struct{},
waitCh <-chan struct{},
) {
var err error
if waitCh != nil {
select {
case <-ctx.Done():
err = context.Canceled
case <-waitCh:
}
} else if err = ctx.Err(); err != nil {
err = context.Canceled
}
if err == nil {
err = r.routine(ctx)
}
cancel()
close(exitedCh)
r.k.mtx.Lock()
if r.ctx == ctx {
r.err = err
r.success = err == nil
r.exited = true
r.exitedCh = nil
if r.retryBo != nil {
if r.deferRetry != nil {
r.deferRetry.Stop()
r.deferRetry = nil
}
if r.success {
r.retryBo.Reset()
} else if r.k.routines[r.key] == r {
dur := r.retryBo.NextBackOff()
if dur != backoff.Stop {
r.deferRetry = time.AfterFunc(dur, func() {
r.k.mtx.Lock()
if r.k.ctx != nil && r.k.routines[r.key] == r && r.exited {
r.start(r.k.ctx, r.exitedCh, true)
}
r.k.mtx.Unlock()
})
}
}
}
for i := len(r.k.exitedCbs) - 1; i >= 0; i-- {
// run after unlocking mtx
defer (r.k.exitedCbs[i])(r.key, r.routine, r.data, r.err)
}
}
r.k.mtx.Unlock()
}
// remove is called when the routine is removed / canceled.
// expects r.k.mtx to be locked
func (r *runningRoutine[K, V]) remove() {
if r.deferRemove != nil {
return
}
removeNow := func() {
if r.ctxCancel != nil {
r.ctxCancel()
}
if r.deferRetry != nil {
// cancel retrying this key
_ = r.deferRetry.Stop()
r.deferRetry = nil
}
delete(r.k.routines, r.key)
}
if r.k.releaseDelay == 0 || (r.exited && !r.success) {
removeNow()
return
}
timerCb := func() {
r.k.mtx.Lock()
if r.k.routines[r.key] == r && r.deferRemove != nil {
_ = r.deferRemove.Stop()
r.deferRemove = nil
removeNow()
}
r.k.mtx.Unlock()
}
r.deferRemove = time.AfterFunc(r.k.releaseDelay, timerCb)
}