-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkeyed-refcount.go
176 lines (154 loc) · 5.15 KB
/
keyed-refcount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package keyed
import (
"context"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
)
// KeyedRefCount manages a list of running routines with reference counts.
type KeyedRefCount[K comparable, V any] struct {
// keyed is the underlying keyed controller
keyed *Keyed[K, V]
// mtx guards below fields
mtx sync.Mutex
// refs is the list of keyed refs.
refs map[K][]*KeyedRef[K, V]
}
// KeyedRef is a reference to a key.
type KeyedRef[K comparable, V any] struct {
rc *KeyedRefCount[K, V]
key K
rel atomic.Bool
}
// Release releases the reference.
func (k *KeyedRef[K, V]) Release() {
if k == nil {
return
}
if k.rel.Swap(true) {
return
}
k.rc.mtx.Lock()
refs := k.rc.refs[k.key]
for i := 0; i < len(refs); i++ {
if refs[i] == k {
refs[i] = refs[len(refs)-1]
refs[len(refs)-1] = nil
refs = refs[:len(refs)-1]
if len(refs) == 0 {
delete(k.rc.refs, k.key)
_ = k.rc.keyed.RemoveKey(k.key)
} else {
k.rc.refs[k.key] = refs
}
break
}
}
k.rc.mtx.Unlock()
}
// NewKeyedRefCount constructs a new Keyed execution manager with reference counting.
// Note: routines won't start until SetContext is called.
func NewKeyedRefCount[K comparable, V any](
ctorCb func(key K) (Routine, V),
opts ...Option[K, V],
) *KeyedRefCount[K, V] {
return &KeyedRefCount[K, V]{
keyed: NewKeyed(ctorCb, opts...),
refs: make(map[K][]*KeyedRef[K, V]),
}
}
// NewKeyedRefCountWithLogger constructs a new Keyed execution manager with reference counting.
// Logs when a controller exits without being removed from the Keys set.
// Note: routines won't start until SetContext is called.
func NewKeyedRefCountWithLogger[K comparable, V any](
ctorCb func(key K) (Routine, V),
le *logrus.Entry,
opts ...Option[K, V],
) *KeyedRefCount[K, V] {
return &KeyedRefCount[K, V]{
keyed: NewKeyedWithLogger(ctorCb, le, opts...),
refs: make(map[K][]*KeyedRef[K, V]),
}
}
// SetContext updates the root context, restarting all running routines.
// if restart is true, all errored routines also restart
//
// nil context is valid and will shutdown the routines.
func (k *KeyedRefCount[K, V]) SetContext(ctx context.Context, restart bool) {
k.keyed.SetContext(ctx, restart)
}
// ClearContext clears the context and shuts down all routines.
func (k *KeyedRefCount[K, V]) ClearContext() {
k.keyed.ClearContext()
}
// GetKeys returns the list of keys registered with the Keyed instance.
func (k *KeyedRefCount[K, V]) GetKeys() []K {
return k.keyed.GetKeys()
}
// GetKeysWithData returns the keys and the data for the keys.
func (k *KeyedRefCount[K, V]) GetKeysWithData() []KeyWithData[K, V] {
return k.keyed.GetKeysWithData()
}
// GetKey returns the value for the given key and if it existed.
func (k *KeyedRefCount[K, V]) GetKey(key K) (V, bool) {
return k.keyed.GetKey(key)
}
// ResetRoutine resets the given routine after checking the condition functions.
// If any return true, resets the instance.
//
// If len(conds) == 0, always resets the given key.
func (k *KeyedRefCount[K, V]) ResetRoutine(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
return k.keyed.ResetRoutine(key, conds...)
}
// RestartRoutine restarts the given routine after checking the condition functions.
// If any return true, and the routine is running, restarts the instance.
//
// If len(conds) == 0, always resets the given key.
func (k *KeyedRefCount[K, V]) RestartRoutine(key K, conds ...func(K, V) bool) (existed bool, reset bool) {
return k.keyed.RestartRoutine(key, conds...)
}
// ResetAllRoutines resets all routines after checking the condition functions.
// If any of the conds functions return true for an instance, resets the instance.
//
// Resetting the instance constructs a new Routine and data with the constructor.
// Note: this will overwrite the existing Data, if present!
// In most cases RestartRoutine is actually what you want.
//
// If len(conds) == 0, always resets the keys.
func (k *KeyedRefCount[K, V]) ResetAllRoutines(conds ...func(K, V) bool) (resetCount, totalCount int) {
return k.keyed.ResetAllRoutines(conds...)
}
// RestartAllRoutines restarts all routines after checking the condition functions.
// If any return true, and the routine is running, restarts the instance.
//
// If len(conds) == 0, always resets the keys.
func (k *KeyedRefCount[K, V]) RestartAllRoutines(conds ...func(K, V) bool) (restartedCount, totalCount int) {
return k.keyed.RestartAllRoutines(conds...)
}
// RemoveKey removes all references to a key deleting it from the set and returning if it existed.
//
// Returns if the key existed.
func (k *KeyedRefCount[K, V]) RemoveKey(key K) bool {
k.mtx.Lock()
defer k.mtx.Unlock()
// clear all refs to the key
for _, ref := range k.refs[key] {
// mark as released
ref.rel.Store(true)
}
delete(k.refs, key)
// return if the key existed
return k.keyed.RemoveKey(key)
}
// AddKeyRef adds a reference to the given key.
// Returns if the key already existed or not.
func (k *KeyedRefCount[K, V]) AddKeyRef(key K) (ref *KeyedRef[K, V], data V, existed bool) {
k.mtx.Lock()
refs := k.refs[key]
nref := &KeyedRef[K, V]{rc: k, key: key}
data, existed = k.keyed.SetKey(key, true)
refs = append(refs, nref)
k.refs[key] = refs
k.mtx.Unlock()
return nref, data, existed
}