Skip to content

Commit 3ca530f

Browse files
committed
Fix delayed job unexpected cancellation
1 parent a7d2ec1 commit 3ca530f

File tree

3 files changed

+27
-30
lines changed

3 files changed

+27
-30
lines changed

pkg/kwok/controllers/node_controller.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -344,12 +344,6 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
344344
)
345345
return nil
346346
}
347-
348-
if !c.delayQueue.Cancel(resourceJob) {
349-
logger.Debug("Failed to cancel stage",
350-
"stage", resourceJob.Stage.Name(),
351-
)
352-
}
353347
}
354348

355349
data, err := expression.ToJSONStandard(node)
@@ -388,7 +382,7 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
388382
}
389383
// we add a normal(fresh) stage job with weight 0,
390384
// resulting in that it will always be processed with high priority compared to those retry ones
391-
c.addStageJob(item, delay, 0)
385+
c.addStageJob(ctx, item, delay, 0)
392386
return nil
393387
}
394388

@@ -419,7 +413,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
419413
// for failed jobs, we re-push them into the queue with a lower weight
420414
// and a backoff period to avoid blocking normal tasks
421415
retryDelay := backoffDelayByStep(retryCount, c.backoff)
422-
c.addStageJob(node, retryDelay, 1)
416+
c.addStageJob(ctx, node, retryDelay, 1)
423417
}
424418
}
425419
}
@@ -623,10 +617,15 @@ func (c *NodeController) funcNodePort() int {
623617
}
624618

625619
// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
626-
func (c *NodeController) addStageJob(job resourceStageJob[*corev1.Node], delay time.Duration, weight int) {
620+
func (c *NodeController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Node], delay time.Duration, weight int) {
627621
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
628622
if loaded {
629-
c.delayQueue.Cancel(old)
623+
if !c.delayQueue.Cancel(old) {
624+
logger := log.FromContext(ctx)
625+
logger.Debug("Failed to cancel stage",
626+
"stage", job.Stage.Name(),
627+
)
628+
}
630629
}
631630
c.delayQueue.AddWeightAfter(job, weight, delay)
632631
}

pkg/kwok/controllers/pod_controller.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,6 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {
240240
)
241241
return nil
242242
}
243-
244-
if !c.delayQueue.Cancel(resourceJob) {
245-
logger.Debug("Failed to cancel stage",
246-
"stage", resourceJob.Stage.Name(),
247-
)
248-
}
249243
}
250244

251245
data, err := expression.ToJSONStandard(pod)
@@ -284,7 +278,7 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {
284278
}
285279
// we add a normal(fresh) stage job with weight 0,
286280
// resulting in that it will always be processed with high priority compared to those retry ones
287-
c.addStageJob(item, delay, 0)
281+
c.addStageJob(ctx, item, delay, 0)
288282
return nil
289283
}
290284

@@ -315,7 +309,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
315309
// for failed jobs, we re-push them into the queue with a lower weight
316310
// and a backoff period to avoid blocking normal tasks
317311
retryDelay := backoffDelayByStep(retryCount, c.backoff)
318-
c.addStageJob(pod, retryDelay, 1)
312+
c.addStageJob(ctx, pod, retryDelay, 1)
319313
}
320314
}
321315
}
@@ -721,10 +715,15 @@ func (c *PodController) List(nodeName string) ([]log.ObjectRef, bool) {
721715
}
722716

723717
// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
724-
func (c *PodController) addStageJob(job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) {
718+
func (c *PodController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) {
725719
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
726720
if loaded {
727-
c.delayQueue.Cancel(old)
721+
if !c.delayQueue.Cancel(old) {
722+
logger := log.FromContext(ctx)
723+
logger.Debug("Failed to cancel stage",
724+
"stage", job.Stage.Name(),
725+
)
726+
}
728727
}
729728
c.delayQueue.AddWeightAfter(job, weight, delay)
730729
}

pkg/kwok/controllers/stage_controller.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,6 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured
220220
)
221221
return nil
222222
}
223-
224-
if !c.delayQueue.Cancel(resourceJob) {
225-
logger.Debug("Failed to cancel stage",
226-
"stage", resourceJob.Stage.Name(),
227-
)
228-
}
229223
}
230224

231225
data, err := expression.ToJSONStandard(resource)
@@ -265,7 +259,7 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured
265259

266260
// we add a normal(fresh) stage job with weight 0,
267261
// resulting in that it will always be processed with high priority compared to those retry ones
268-
c.addStageJob(item, delay, 0)
262+
c.addStageJob(ctx, item, delay, 0)
269263
return nil
270264
}
271265

@@ -296,7 +290,7 @@ func (c *StageController) playStageWorker(ctx context.Context) {
296290
// for failed jobs, we re-push them into the queue with a lower weight
297291
// and a backoff period to avoid blocking normal tasks
298292
retryDelay := backoffDelayByStep(retryCount, c.backoff)
299-
c.addStageJob(resource, retryDelay, 1)
293+
c.addStageJob(ctx, resource, retryDelay, 1)
300294
}
301295
}
302296
}
@@ -507,10 +501,15 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl
507501
}
508502

509503
// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
510-
func (c *StageController) addStageJob(job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) {
504+
func (c *StageController) addStageJob(ctx context.Context, job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) {
511505
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
512506
if loaded {
513-
c.delayQueue.Cancel(old)
507+
if !c.delayQueue.Cancel(old) {
508+
logger := log.FromContext(ctx)
509+
logger.Debug("Failed to cancel stage",
510+
"stage", job.Stage.Name(),
511+
)
512+
}
514513
}
515514
c.delayQueue.AddWeightAfter(job, weight, delay)
516515
}

0 commit comments

Comments
 (0)