Skip to content

Commit 216c529

Browse files
committed
WithLowPriorityWhenUnchanged: Set Priority for all add methods
This change makes `WithLowPriorityWhenUnchanged` set the priority for AddAfter, AddRatelimited and AddWithOpts (if not already set) as well.
1 parent 03c44f5 commit 216c529

File tree

2 files changed

+167
-41
lines changed

2 files changed

+167
-41
lines changed

pkg/handler/eventhandler.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handler
1919
import (
2020
"context"
2121
"reflect"
22+
"time"
2223

2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -126,20 +127,14 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
126127
h.CreateFunc(ctx, e, q)
127128
return
128129
}
129-
wq := workqueueWithCustomAddFunc[request]{
130-
TypedRateLimitingInterface: q,
130+
131+
wq := workqueueWithDefaultPriority[request]{
131132
// We already know that we have a priority queue, that event.Object implements
132133
// client.Object and that its not nil
133-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
134-
var priority int
135-
if e.IsInInitialList {
136-
priority = LowPriority
137-
}
138-
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
139-
priorityqueue.AddOpts{Priority: &priority},
140-
item,
141-
)
142-
},
134+
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
135+
}
136+
if e.IsInInitialList {
137+
wq.priority = LowPriority
143138
}
144139
h.CreateFunc(ctx, e, wq)
145140
}
@@ -160,20 +155,13 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp
160155
return
161156
}
162157

163-
wq := workqueueWithCustomAddFunc[request]{
164-
TypedRateLimitingInterface: q,
158+
wq := workqueueWithDefaultPriority[request]{
165159
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
166160
// client.Object and that they are not nil
167-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
168-
var priority int
169-
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
170-
priority = LowPriority
171-
}
172-
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
173-
priorityqueue.AddOpts{Priority: &priority},
174-
item,
175-
)
176-
},
161+
PriorityQueue: q.(priorityqueue.PriorityQueue[request]),
162+
}
163+
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
164+
wq.priority = LowPriority
177165
}
178166
h.UpdateFunc(ctx, e, wq)
179167
}
@@ -201,13 +189,28 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
201189
}
202190
}
203191

204-
type workqueueWithCustomAddFunc[request comparable] struct {
205-
workqueue.TypedRateLimitingInterface[request]
206-
addFunc func(item request, q workqueue.TypedRateLimitingInterface[request])
192+
type workqueueWithDefaultPriority[request comparable] struct {
193+
priorityqueue.PriorityQueue[request]
194+
priority int
195+
}
196+
197+
func (w workqueueWithDefaultPriority[request]) Add(item request) {
198+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority}, item)
207199
}
208200

209-
func (w workqueueWithCustomAddFunc[request]) Add(item request) {
210-
w.addFunc(item, w.TypedRateLimitingInterface)
201+
func (w workqueueWithDefaultPriority[request]) AddAfter(item request, after time.Duration) {
202+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, After: after}, item)
203+
}
204+
205+
func (w workqueueWithDefaultPriority[request]) AddRateLimited(item request) {
206+
w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: &w.priority, RateLimited: true}, item)
207+
}
208+
209+
func (w workqueueWithDefaultPriority[request]) AddWithOpts(o priorityqueue.AddOpts, items ...request) {
210+
if o.Priority == nil {
211+
o.Priority = &w.priority
212+
}
213+
w.PriorityQueue.AddWithOpts(o, items...)
211214
}
212215

213216
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler

pkg/handler/eventhandler_test.go

Lines changed: 135 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handler_test
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
. "github.com/onsi/ginkgo/v2"
2324
. "github.com/onsi/gomega"
@@ -776,8 +777,11 @@ var _ = Describe("Eventhandler", func() {
776777

777778
Describe("WithLowPriorityWhenUnchanged", func() {
778779
handlerPriorityTests := []struct {
779-
name string
780-
handler func() handler.EventHandler
780+
name string
781+
handler func() handler.EventHandler
782+
after time.Duration
783+
ratelimited bool
784+
overridePriority int
781785
}{
782786
{
783787
name: "WithLowPriorityWhenUnchanged wrapper",
@@ -837,6 +841,103 @@ var _ = Describe("Eventhandler", func() {
837841
})
838842
},
839843
},
844+
{
845+
name: "WithLowPriorityWhenUnchanged - Add",
846+
handler: func() handler.EventHandler {
847+
return handler.WithLowPriorityWhenUnchanged(
848+
handler.TypedFuncs[client.Object, reconcile.Request]{
849+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
850+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
851+
Namespace: tce.Object.GetNamespace(),
852+
Name: tce.Object.GetName(),
853+
}})
854+
},
855+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
856+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
857+
Namespace: tue.ObjectNew.GetNamespace(),
858+
Name: tue.ObjectNew.GetName(),
859+
}})
860+
},
861+
})
862+
},
863+
},
864+
{
865+
name: "WithLowPriorityWhenUnchanged - AddAfter",
866+
handler: func() handler.EventHandler {
867+
return handler.WithLowPriorityWhenUnchanged(
868+
handler.TypedFuncs[client.Object, reconcile.Request]{
869+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
870+
wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
871+
Namespace: tce.Object.GetNamespace(),
872+
Name: tce.Object.GetName(),
873+
}}, time.Second)
874+
},
875+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
876+
wq.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
877+
Namespace: tue.ObjectNew.GetNamespace(),
878+
Name: tue.ObjectNew.GetName(),
879+
}}, time.Second)
880+
},
881+
})
882+
},
883+
after: time.Second,
884+
},
885+
{
886+
name: "WithLowPriorityWhenUnchanged - AddRateLimited",
887+
handler: func() handler.EventHandler {
888+
return handler.WithLowPriorityWhenUnchanged(
889+
handler.TypedFuncs[client.Object, reconcile.Request]{
890+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
891+
wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{
892+
Namespace: tce.Object.GetNamespace(),
893+
Name: tce.Object.GetName(),
894+
}})
895+
},
896+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
897+
wq.AddRateLimited(reconcile.Request{NamespacedName: types.NamespacedName{
898+
Namespace: tue.ObjectNew.GetNamespace(),
899+
Name: tue.ObjectNew.GetName(),
900+
}})
901+
},
902+
})
903+
},
904+
ratelimited: true,
905+
},
906+
{
907+
name: "WithLowPriorityWhenUnchanged - AddWithOpts priority is retained",
908+
handler: func() handler.EventHandler {
909+
return handler.WithLowPriorityWhenUnchanged(
910+
handler.TypedFuncs[client.Object, reconcile.Request]{
911+
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
912+
if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ {
913+
pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{
914+
Namespace: tce.Object.GetNamespace(),
915+
Name: tce.Object.GetName(),
916+
}})
917+
return
918+
}
919+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
920+
Namespace: tce.Object.GetNamespace(),
921+
Name: tce.Object.GetName(),
922+
}})
923+
},
924+
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) {
925+
if pq, isPQ := wq.(priorityqueue.PriorityQueue[reconcile.Request]); isPQ {
926+
pq.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(100)}, reconcile.Request{NamespacedName: types.NamespacedName{
927+
Namespace: tue.ObjectNew.GetNamespace(),
928+
Name: tue.ObjectNew.GetName(),
929+
}})
930+
return
931+
}
932+
wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{
933+
Namespace: tue.ObjectNew.GetNamespace(),
934+
Name: tue.ObjectNew.GetName(),
935+
}})
936+
},
937+
})
938+
},
939+
overridePriority: 100,
940+
},
840941
}
841942
for _, test := range handlerPriorityTests {
842943
When("handler is "+test.name, func() {
@@ -862,7 +963,16 @@ var _ = Describe("Eventhandler", func() {
862963
IsInInitialList: true,
863964
}, wq)
864965

865-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
966+
expected := handler.LowPriority
967+
if test.overridePriority != 0 {
968+
expected = test.overridePriority
969+
}
970+
971+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{
972+
Priority: ptr.To(expected),
973+
After: test.after,
974+
RateLimited: test.ratelimited,
975+
}))
866976
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
867977
})
868978

@@ -888,10 +998,14 @@ var _ = Describe("Eventhandler", func() {
888998
IsInInitialList: false,
889999
}, wq)
8901000

891-
Expect(actualOpts).To(Or(
892-
Equal(priorityqueue.AddOpts{}),
893-
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
894-
))
1001+
if test.overridePriority != 0 {
1002+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(test.overridePriority)}))
1003+
} else {
1004+
Expect(actualOpts).To(Or(
1005+
Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited}),
1006+
Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(0)}),
1007+
))
1008+
}
8951009
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
8961010
})
8971011

@@ -922,7 +1036,12 @@ var _ = Describe("Eventhandler", func() {
9221036
}},
9231037
}, wq)
9241038

925-
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: ptr.To(handler.LowPriority)}))
1039+
expectedPriority := handler.LowPriority
1040+
if test.overridePriority != 0 {
1041+
expectedPriority = test.overridePriority
1042+
}
1043+
1044+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(expectedPriority)}))
9261045
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
9271046
})
9281047

@@ -954,10 +1073,14 @@ var _ = Describe("Eventhandler", func() {
9541073
}},
9551074
}, wq)
9561075

957-
Expect(actualOpts).To(Or(
958-
Equal(priorityqueue.AddOpts{}),
959-
Equal(priorityqueue.AddOpts{Priority: ptr.To(0)}),
960-
))
1076+
if test.overridePriority != 0 {
1077+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(test.overridePriority)}))
1078+
} else {
1079+
Expect(actualOpts).To(Or(
1080+
Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited}),
1081+
Equal(priorityqueue.AddOpts{After: test.after, RateLimited: test.ratelimited, Priority: ptr.To(0)}),
1082+
))
1083+
}
9611084
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
9621085
})
9631086

0 commit comments

Comments
 (0)