Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Don't fail suspended Kubeflow jobs #6295

Open
wants to merge 1 commit into
base: fg91/dep/upgrade-kubeflow-training-operator
Choose a base branch
from

Conversation

fg91
Copy link
Member

@fg91 fg91 commented Mar 2, 2025

Why are the changes needed?

The flyteplugins kubeflow plugins currently fail tasks if the underlying CRD object (PyTorchJob, TfJob, MpiJob) haven't been updated by the training operator for a certain timeout period (see here):

	if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
		return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp)
	}

However, the jobs provided by the kubeflow training operator can be in a so-called "suspended" state (e.g. when using them with an external queueing system like Kueue). Jobs in a suspended state are expected to not be updated by the training operator.

What changes were proposed in this pull request?

I propose to add a check for suspension to the error condition in the code snippet above so that the flyteplugin ignores unmodified kubeflow jobs if they are in a suspended state.

Please note that other flyteplugins like spark or ray don't have this "CRD has been updated" check at all so this change does not introduce inconsistency.

How was this patch tested?

Added unit tests. Ran flytepropeller image with this change in cluster.

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Summary by Bito

This PR updates Kubeflow plugins (MPI, PyTorch, TensorFlow) to properly handle suspended jobs by adding suspension checks to timeout failure conditions. It prevents premature task failures by verifying job suspension status before raising errors, ensuring suspended jobs are treated appropriately without affecting other plugins.

Unit tests added: False

Estimated effort to review (1-5, lower is better): 1

@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 2, 2025

Code Review Agent Run #ae6c12

Actionable Suggestions - 0
Review Details
  • Files reviewed - 6 · Commit Range: 9559d64..9559d64
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go
  • Files skipped - 0
  • Tools
    • Golangci-lint (Linter) - ✖︎ Failed
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 2, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Suspended Jobs Handling Update

mpi.go - Added suspension check to bypass timeout failure for suspended jobs.

pytorch.go - Incorporated suspension condition to prevent false error when job is suspended.

tensorflow.go - Updated timeout logic to include suspended state verification.

Testing - Suspended Job Test Cases

mpi_test.go - Introduced tests to validate behavior for non-updated and suspended MPI jobs.

pytorch_test.go - Added tests ensuring proper handling of suspended PyTorch jobs.

tensorflow_test.go - Enhanced test coverage to verify suspended state handling in TensorFlow job scenarios.

Copy link

codecov bot commented Mar 2, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.50%. Comparing base (5effd97) to head (ce5a5d8).

Additional details and impacted files
@@                               Coverage Diff                               @@
##           fg91/dep/upgrade-kubeflow-training-operator    #6295      +/-   ##
===============================================================================
+ Coverage                                        58.48%   58.50%   +0.02%     
===============================================================================
  Files                                              937      937              
  Lines                                            71088    71091       +3     
===============================================================================
+ Hits                                             41577    41594      +17     
+ Misses                                           26359    26348      -11     
+ Partials                                          3152     3149       -3     
Flag Coverage Δ
unittests-datacatalog 59.06% <ø> (ø)
unittests-flyteadmin 56.30% <ø> (+0.02%) ⬆️
unittests-flytecopilot 30.99% <ø> (ø)
unittests-flytectl 64.70% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 61.07% <100.00%> (+0.07%) ⬆️
unittests-flytepropeller 54.79% <ø> (ø)
unittests-flytestdlib 64.02% <ø> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@fg91 fg91 marked this pull request as draft March 2, 2025 16:18
@fg91 fg91 force-pushed the fg91/dep/upgrade-kubeflow-training-operator branch from 72d0107 to 5effd97 Compare March 7, 2025 06:07
@fg91 fg91 added the fixed For any bug fixes label Mar 7, 2025
@fg91 fg91 force-pushed the fg91/fix/dont-fail-suspended-kubeflow-jobs branch from 9559d64 to ce5a5d8 Compare March 7, 2025 06:31
@fg91 fg91 marked this pull request as ready for review March 7, 2025 06:45
@fg91 fg91 self-assigned this Mar 7, 2025
@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 7, 2025

Code Review Agent Run #58db2d

Actionable Suggestions - 1
  • flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go - 1
Review Details
  • Files reviewed - 6 · Commit Range: ce5a5d8..ce5a5d8
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go
    • flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +230 to +231
isSuspended := app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference check

Consider checking if app.Spec.RunPolicy is nil before accessing app.Spec.RunPolicy.Suspend. This would prevent a potential nil pointer dereference if RunPolicy is not initialized.

Code suggestion
Check the AI-generated fix before applying
Suggested change
isSuspended := app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
isSuspended := false
if app.Spec.RunPolicy != nil && app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend {
isSuspended = true
}
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {

Code Review Run #58db2d


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fixed For any bug fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants