From 359d2b2e8de939a11054d31c2082892e3b516038 Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Sat, 3 Feb 2024 19:12:09 +0800 Subject: [PATCH] Fix delayed job unexpected cancellation --- pkg/kwok/controllers/node_controller.go | 19 +++++++++---------- pkg/kwok/controllers/pod_controller.go | 19 +++++++++---------- pkg/kwok/controllers/stage_controller.go | 19 +++++++++---------- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index 8faf31f2b..76fb4bd11 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -344,12 +344,6 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro ) return nil } - - if !c.delayQueue.Cancel(resourceJob) { - logger.Debug("Failed to cancel stage", - "stage", resourceJob.Stage.Name(), - ) - } } data, err := expression.ToJSONStandard(node) @@ -388,7 +382,7 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro } // we add a normal(fresh) stage job with weight 0, // resulting in that it will always be processed with high priority compared to those retry ones - c.addStageJob(item, delay, 0) + c.addStageJob(ctx, item, delay, 0) return nil } @@ -419,7 +413,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) { // for failed jobs, we re-push them into the queue with a lower weight // and a backoff period to avoid blocking normal tasks retryDelay := backoffDelayByStep(retryCount, c.backoff) - c.addStageJob(node, retryDelay, 1) + c.addStageJob(ctx, node, retryDelay, 1) } } } @@ -623,10 +617,15 @@ func (c *NodeController) funcNodePort() int { } // addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map -func (c *NodeController) addStageJob(job resourceStageJob[*corev1.Node], delay time.Duration, weight int) { +func (c *NodeController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Node], delay time.Duration, weight int) { old, loaded := c.delayQueueMapping.Swap(job.Key, job) if loaded { - c.delayQueue.Cancel(old) + if !c.delayQueue.Cancel(old) { + logger := log.FromContext(ctx) + logger.Debug("Failed to cancel stage", + "stage", job.Stage.Name(), + ) + } } c.delayQueue.AddWeightAfter(job, weight, delay) } diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 8d99bd923..8a97c62f2 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -240,12 +240,6 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { ) return nil } - - if !c.delayQueue.Cancel(resourceJob) { - logger.Debug("Failed to cancel stage", - "stage", resourceJob.Stage.Name(), - ) - } } data, err := expression.ToJSONStandard(pod) @@ -284,7 +278,7 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { } // we add a normal(fresh) stage job with weight 0, // resulting in that it will always be processed with high priority compared to those retry ones - c.addStageJob(item, delay, 0) + c.addStageJob(ctx, item, delay, 0) return nil } @@ -315,7 +309,7 @@ func (c *PodController) playStageWorker(ctx context.Context) { // for failed jobs, we re-push them into the queue with a lower weight // and a backoff period to avoid blocking normal tasks retryDelay := backoffDelayByStep(retryCount, c.backoff) - c.addStageJob(pod, retryDelay, 1) + c.addStageJob(ctx, pod, retryDelay, 1) } } } @@ -721,10 +715,15 @@ func (c *PodController) List(nodeName string) ([]log.ObjectRef, bool) { } // addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map -func (c *PodController) addStageJob(job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) { +func (c *PodController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) { old, loaded := c.delayQueueMapping.Swap(job.Key, job) if loaded { - c.delayQueue.Cancel(old) + if !c.delayQueue.Cancel(old) { + logger := log.FromContext(ctx) + logger.Debug("Failed to cancel stage", + "stage", job.Stage.Name(), + ) + } } c.delayQueue.AddWeightAfter(job, weight, delay) } diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go index 43d8fd919..9ce3fb759 100644 --- a/pkg/kwok/controllers/stage_controller.go +++ b/pkg/kwok/controllers/stage_controller.go @@ -220,12 +220,6 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured ) return nil } - - if !c.delayQueue.Cancel(resourceJob) { - logger.Debug("Failed to cancel stage", - "stage", resourceJob.Stage.Name(), - ) - } } data, err := expression.ToJSONStandard(resource) @@ -265,7 +259,7 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured // we add a normal(fresh) stage job with weight 0, // resulting in that it will always be processed with high priority compared to those retry ones - c.addStageJob(item, delay, 0) + c.addStageJob(ctx, item, delay, 0) return nil } @@ -296,7 +290,7 @@ func (c *StageController) playStageWorker(ctx context.Context) { // for failed jobs, we re-push them into the queue with a lower weight // and a backoff period to avoid blocking normal tasks retryDelay := backoffDelayByStep(retryCount, c.backoff) - c.addStageJob(resource, retryDelay, 1) + c.addStageJob(ctx, resource, retryDelay, 1) } } } @@ -507,10 +501,15 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl } // addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map -func (c *StageController) addStageJob(job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) { +func (c *StageController) addStageJob(ctx context.Context, job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) { old, loaded := c.delayQueueMapping.Swap(job.Key, job) if loaded { - c.delayQueue.Cancel(old) + if !c.delayQueue.Cancel(old) { + logger := log.FromContext(ctx) + logger.Debug("Failed to cancel stage", + "stage", job.Stage.Name(), + ) + } } c.delayQueue.AddWeightAfter(job, weight, delay) }