Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#965 from k8s-infra-cherrypick-robo…
Browse files Browse the repository at this point in the history
…t/cherry-pick-962-to-release-0.5

[release-0.5] Fix delayed job unexpected cancellation
  • Loading branch information
k8s-ci-robot authored Feb 6, 2024
2 parents 58a4791 + 359d2b2 commit 53f06c7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 30 deletions.
19 changes: 9 additions & 10 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,6 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
)
return nil
}

if !c.delayQueue.Cancel(resourceJob) {
logger.Debug("Failed to cancel stage",
"stage", resourceJob.Stage.Name(),
)
}
}

data, err := expression.ToJSONStandard(node)
Expand Down Expand Up @@ -388,7 +382,7 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
}
// we add a normal(fresh) stage job with weight 0,
// resulting in that it will always be processed with high priority compared to those retry ones
c.addStageJob(item, delay, 0)
c.addStageJob(ctx, item, delay, 0)
return nil
}

Expand Down Expand Up @@ -419,7 +413,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
// for failed jobs, we re-push them into the queue with a lower weight
// and a backoff period to avoid blocking normal tasks
retryDelay := backoffDelayByStep(retryCount, c.backoff)
c.addStageJob(node, retryDelay, 1)
c.addStageJob(ctx, node, retryDelay, 1)
}
}
}
Expand Down Expand Up @@ -623,10 +617,15 @@ func (c *NodeController) funcNodePort() int {
}

// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
func (c *NodeController) addStageJob(job resourceStageJob[*corev1.Node], delay time.Duration, weight int) {
func (c *NodeController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Node], delay time.Duration, weight int) {
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
if loaded {
c.delayQueue.Cancel(old)
if !c.delayQueue.Cancel(old) {
logger := log.FromContext(ctx)
logger.Debug("Failed to cancel stage",
"stage", job.Stage.Name(),
)
}
}
c.delayQueue.AddWeightAfter(job, weight, delay)
}
19 changes: 9 additions & 10 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,6 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {
)
return nil
}

if !c.delayQueue.Cancel(resourceJob) {
logger.Debug("Failed to cancel stage",
"stage", resourceJob.Stage.Name(),
)
}
}

data, err := expression.ToJSONStandard(pod)
Expand Down Expand Up @@ -284,7 +278,7 @@ func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error {
}
// we add a normal(fresh) stage job with weight 0,
// resulting in that it will always be processed with high priority compared to those retry ones
c.addStageJob(item, delay, 0)
c.addStageJob(ctx, item, delay, 0)
return nil
}

Expand Down Expand Up @@ -315,7 +309,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
// for failed jobs, we re-push them into the queue with a lower weight
// and a backoff period to avoid blocking normal tasks
retryDelay := backoffDelayByStep(retryCount, c.backoff)
c.addStageJob(pod, retryDelay, 1)
c.addStageJob(ctx, pod, retryDelay, 1)
}
}
}
Expand Down Expand Up @@ -721,10 +715,15 @@ func (c *PodController) List(nodeName string) ([]log.ObjectRef, bool) {
}

// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
func (c *PodController) addStageJob(job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) {
func (c *PodController) addStageJob(ctx context.Context, job resourceStageJob[*corev1.Pod], delay time.Duration, weight int) {
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
if loaded {
c.delayQueue.Cancel(old)
if !c.delayQueue.Cancel(old) {
logger := log.FromContext(ctx)
logger.Debug("Failed to cancel stage",
"stage", job.Stage.Name(),
)
}
}
c.delayQueue.AddWeightAfter(job, weight, delay)
}
19 changes: 9 additions & 10 deletions pkg/kwok/controllers/stage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,6 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured
)
return nil
}

if !c.delayQueue.Cancel(resourceJob) {
logger.Debug("Failed to cancel stage",
"stage", resourceJob.Stage.Name(),
)
}
}

data, err := expression.ToJSONStandard(resource)
Expand Down Expand Up @@ -265,7 +259,7 @@ func (c *StageController) preprocess(ctx context.Context, resource *unstructured

// we add a normal(fresh) stage job with weight 0,
// resulting in that it will always be processed with high priority compared to those retry ones
c.addStageJob(item, delay, 0)
c.addStageJob(ctx, item, delay, 0)
return nil
}

Expand Down Expand Up @@ -296,7 +290,7 @@ func (c *StageController) playStageWorker(ctx context.Context) {
// for failed jobs, we re-push them into the queue with a lower weight
// and a backoff period to avoid blocking normal tasks
retryDelay := backoffDelayByStep(retryCount, c.backoff)
c.addStageJob(resource, retryDelay, 1)
c.addStageJob(ctx, resource, retryDelay, 1)
}
}
}
Expand Down Expand Up @@ -507,10 +501,15 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl
}

// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
func (c *StageController) addStageJob(job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) {
func (c *StageController) addStageJob(ctx context.Context, job resourceStageJob[*unstructured.Unstructured], delay time.Duration, weight int) {
old, loaded := c.delayQueueMapping.Swap(job.Key, job)
if loaded {
c.delayQueue.Cancel(old)
if !c.delayQueue.Cancel(old) {
logger := log.FromContext(ctx)
logger.Debug("Failed to cancel stage",
"stage", job.Stage.Name(),
)
}
}
c.delayQueue.AddWeightAfter(job, weight, delay)
}

0 comments on commit 53f06c7

Please sign in to comment.