-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkeyed.go
390 lines (339 loc) · 9.41 KB
/
keyed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
package keyed
import (
"context"
"sync"
"time"
cbackoff "github.com/aperturerobotics/util/backoff/cbackoff"
"github.com/sirupsen/logrus"
)
// Routine is a function called as a goroutine.
// If nil is returned, exits cleanly permanently.
// If an error is returned, can be restarted later.
type Routine func(ctx context.Context) error
// Keyed manages a set of goroutines with associated Keys.
//
// K is the type of the key.
// V is the type of the value.
type Keyed[K comparable, V any] struct {
// ctorCb is the constructor callback
ctorCb func(key K) (Routine, V)
// exitedCbs is the set of exited callbacks.
exitedCbs []func(key K, routine Routine, data V, err error)
// releaseDelay is a delay before stopping a routine.
releaseDelay time.Duration
// backoffFactory is the backoff factory
// if nil, backoff is disabled
backoffFactory func(k K) cbackoff.BackOff
// mtx guards below fields
mtx sync.Mutex
// ctx is the current root context
ctx context.Context
// routines is the set of running routines
routines map[K]*runningRoutine[K, V]
}
// NewKeyed constructs a new Keyed execution manager.
// Note: routines won't start until SetContext is called.
func NewKeyed[K comparable, V any](
ctorCb func(key K) (Routine, V),
opts ...Option[K, V],
) *Keyed[K, V] {
if ctorCb == nil {
ctorCb = func(key K) (Routine, V) {
var empty V
return nil, empty
}
}
k := &Keyed[K, V]{
ctorCb: ctorCb,
routines: make(map[K]*runningRoutine[K, V], 1),
}
for _, opt := range opts {
if opt != nil {
opt.ApplyToKeyed(k)
}
}
return k
}
// NewKeyedWithLogger constructs a new keyed instance.
// Logs when a controller exits without being removed from the Keys set.
//
// Note: routines won't start until SetContext is called.
func NewKeyedWithLogger[K comparable, V any](
ctorCb func(key K) (Routine, V),
le *logrus.Entry,
opts ...Option[K, V],
) *Keyed[K, V] {
return NewKeyed(ctorCb, append([]Option[K, V]{WithExitLogger[K, V](le)}, opts...)...)
}
// SetContext updates the root context, restarting all running routines.
//
// nil context is valid and will shutdown the routines.
// if restart is true, all errored routines also restart
func (k *Keyed[K, V]) SetContext(ctx context.Context, restart bool) {
k.mtx.Lock()
k.setContextLocked(ctx, restart)
k.mtx.Unlock()
}
// setContextLocked sets the context while mtx is locked.
func (k *Keyed[K, V]) setContextLocked(ctx context.Context, restart bool) {
sameCtx := k.ctx == ctx
if sameCtx && !restart {
return
}
k.ctx = ctx
for _, rr := range k.routines {
if sameCtx && rr.err == nil {
continue
}
rr.ctx = nil
if rr.ctxCancel != nil {
rr.ctxCancel()
rr.ctxCancel = nil
}
if rr.err == nil || restart {
if ctx != nil {
rr.start(ctx, rr.exitedCh, false)
}
}
}
}
// ClearContext clears the context and shuts down any running routines.
func (k *Keyed[K, V]) ClearContext() {
k.SetContext(nil, false)
}
// GetKeys returns the list of keys registered with the Keyed instance.
func (k *Keyed[K, V]) GetKeys() []K {
k.mtx.Lock()
defer k.mtx.Unlock()
keys := make([]K, 0, len(k.routines))
for k := range k.routines {
keys = append(keys, k)
}
return keys
}
// KeyWithData is a key with associated data.
type KeyWithData[K comparable, V any] struct {
// Key is the key.
Key K
// Data is the value.
Data V
}
// GetKeysWithData returns the keys and the data for the keys.
func (k *Keyed[K, V]) GetKeysWithData() []KeyWithData[K, V] {
k.mtx.Lock()
defer k.mtx.Unlock()
out := make([]KeyWithData[K, V], 0, len(k.routines))
for k, v := range k.routines {
out = append(out, KeyWithData[K, V]{
Key: k,
Data: v.data,
})
}
return out
}
// SetKey inserts the given key into the set, if it doesn't already exist.
// If start=true, restarts the routine from any stopped or failed state.
// Returns if it existed already or not.
func (k *Keyed[K, V]) SetKey(key K, start bool) (V, bool) {
k.mtx.Lock()
defer k.mtx.Unlock()
v, existed := k.routines[key]
if !existed {
routine, data := k.ctorCb(key)
v = newRunningRoutine(k, key, routine, data, k.backoffFactory)
k.routines[key] = v
} else {
if v.deferRemove != nil {
// cancel removing this key
_ = v.deferRemove.Stop()
v.deferRemove = nil
}
if v.deferRetry != nil {
// cancel retrying this key
_ = v.deferRetry.Stop()
v.deferRetry = nil
}
}
if !existed || start {
if k.ctx != nil {
v.start(k.ctx, v.exitedCh, false)
}
}
return v.data, existed
}
// RemoveKey removes the given key from the set, if it exists.
// Returns if it existed.
func (k *Keyed[K, V]) RemoveKey(key K) bool {
k.mtx.Lock()
defer k.mtx.Unlock()
v, existed := k.routines[key]
if existed {
v.remove()
}
return existed
}
// SyncKeys synchronizes the list of running routines with the given list.
// If restart=true, restarts any routines in the failed state.
func (k *Keyed[K, V]) SyncKeys(keys []K, restart bool) (added, removed []K) {
k.mtx.Lock()
defer k.mtx.Unlock()
if k.ctx != nil && k.ctx.Err() != nil {
k.ctx = nil
}
routines := make(map[K]*runningRoutine[K, V], len(keys))
for _, key := range keys {
v := routines[key]
if v != nil {
// already processed
continue
}
v, existed := k.routines[key]
if !existed {
routine, data := k.ctorCb(key)
v = newRunningRoutine(k, key, routine, data, k.backoffFactory)
k.routines[key] = v
added = append(added, key)
}
routines[key] = v
if (!existed || restart) && k.ctx != nil {
v.start(k.ctx, v.exitedCh, false)
}
}
for key, rr := range k.routines {
if _, ok := routines[key]; !ok {
removed = append(removed, key)
rr.remove()
}
}
return
}
// GetKey returns the value for the given key and existed.
func (k *Keyed[K, V]) GetKey(key K) (V, bool) {
k.mtx.Lock()
defer k.mtx.Unlock()
v, existed := k.routines[key]
if !existed {
var empty V
return empty, false
}
return v.data, true
}
// ResetRoutine resets the given routine after checking the condition functions.
// If any of the conds functions return true, resets the instance.
//
// Resetting the instance constructs a new Routine and data with the constructor.
// Note: this will overwrite the existing Data, if present!
// In most cases RestartRoutine is actually what you want.
//
// If len(conds) == 0, always resets the given key.
func (k *Keyed[K, V]) ResetRoutine(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
k.mtx.Lock()
defer k.mtx.Unlock()
return k.resetRoutineLocked(key, conds...)
}
// ResetAllRoutines resets all routines after checking the condition functions.
// If any of the conds functions return true for an instance, resets the instance.
//
// Resetting the instance constructs a new Routine and data with the constructor.
// Note: this will overwrite the existing Data, if present!
// In most cases RestartRoutine is actually what you want.
//
// If len(conds) == 0, always resets the keys.
func (k *Keyed[K, V]) ResetAllRoutines(conds ...func(K, V) bool) (resetCount, totalCount int) {
k.mtx.Lock()
defer k.mtx.Unlock()
totalCount = len(k.routines)
for key := range k.routines {
if existed, reset := k.resetRoutineLocked(key, conds...); existed && reset {
resetCount++
}
}
return
}
// resetRoutineLocked resets the given routine while mtx is locked.
func (k *Keyed[K, V]) resetRoutineLocked(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
if k.ctx != nil && k.ctx.Err() != nil {
k.ctx = nil
}
v, existed := k.routines[key]
if !existed {
return false, false
}
anyMatched := len(conds) == 0
for _, cond := range conds {
if cond != nil && cond(key, v.data) {
anyMatched = true
break
}
}
if !anyMatched {
return true, false
}
if v.ctxCancel != nil {
v.ctxCancel()
}
prevExitedCh := v.exitedCh
routine, data := k.ctorCb(key)
v = newRunningRoutine(k, key, routine, data, k.backoffFactory)
k.routines[key] = v
if k.ctx != nil {
v.start(k.ctx, prevExitedCh, false)
}
return true, true
}
// RestartRoutine restarts the given routine after checking the condition functions.
// If any return true, and the routine is running, restarts the instance.
//
// If len(conds) == 0, always resets the given key.
func (k *Keyed[K, V]) RestartRoutine(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
k.mtx.Lock()
defer k.mtx.Unlock()
return k.restartRoutineLocked(key, conds...)
}
// RestartAllRoutines restarts all routines after checking the condition functions.
// If any return true, and the routine is running, restarts the instance.
//
// If len(conds) == 0, always resets the keys.
func (k *Keyed[K, V]) RestartAllRoutines(conds ...func(K, V) bool) (restartedCount, totalCount int) {
k.mtx.Lock()
defer k.mtx.Unlock()
totalCount = len(k.routines)
for key := range k.routines {
if existed, reset := k.restartRoutineLocked(key, conds...); existed && reset {
restartedCount++
}
}
return
}
// resetRoutineLocked restarts the given routine while mtx is locked.
func (k *Keyed[K, V]) restartRoutineLocked(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
if k.ctx != nil && k.ctx.Err() != nil {
k.ctx = nil
}
v, existed := k.routines[key]
if !existed {
return false, false
}
if k.ctx == nil {
return true, false
}
anyMatched := len(conds) == 0
for _, cond := range conds {
if cond != nil && cond(key, v.data) {
anyMatched = true
break
}
}
if !anyMatched {
return true, false
}
if v.ctxCancel != nil {
v.ctxCancel()
v.ctxCancel = nil
}
if k.ctx != nil {
prevExitedCh := v.exitedCh
v.start(k.ctx, prevExitedCh, true)
}
return true, true
}