Skip to content

Commit

Permalink
Fix: Don't fail suspended Kubeflow jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <[email protected]>
  • Loading branch information
fg91 committed Mar 7, 2025
1 parent 5effd97 commit ce5a5d8
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 3 deletions.
4 changes: 3 additions & 1 deletion flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func (mpiOperatorResourceHandler) GetTaskPhase(ctx context.Context, pluginContex
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {

isSuspended := app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi custom resource since creation time %v", app.CreationTimestamp)
}
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
Expand Down
19 changes: 19 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,25 @@ func TestGetTaskPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseRunning, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

// Training operator did not modify the job even though it is not suspended
mpiJob := dummyMPIJobResourceCreator(kubeflowv1.JobCreated)
mpiJob.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
mpiJob.Status.StartTime = nil
taskPhase, err = mpiResourceHandler.GetTaskPhase(ctx, taskCtx, mpiJob)
assert.Error(t, err)
assert.Contains(t, err.Error(), "kubeflow operator hasn't updated")
assert.Equal(t, pluginsCore.PhaseInfoUndefined, taskPhase)

// Training operator did not modify the job because it is suspended
mpiJobSuspended := dummyMPIJobResourceCreator(kubeflowv1.JobCreated)
mpiJobSuspended.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
mpiJobSuspended.Status.StartTime = nil
suspend := true
mpiJobSuspended.Spec.RunPolicy.Suspend = &suspend
taskPhase, err = mpiResourceHandler.GetTaskPhase(ctx, taskCtx, mpiJobSuspended)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
}

func TestGetTaskPhaseIncreasePhaseVersion(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(ctx context.Context, pluginCo
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
isSuspended := app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp)
}
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,25 @@ func TestGetTaskPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseRunning, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

// Training operator did not modify the job even though it is not suspended
pytorchJob := dummyPytorchJobResourceCreator(kubeflowv1.JobCreated)
pytorchJob.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
pytorchJob.Status.StartTime = nil
taskPhase, err = pytorchResourceHandler.GetTaskPhase(ctx, taskCtx, pytorchJob)
assert.Error(t, err)
assert.Contains(t, err.Error(), "kubeflow operator hasn't updated")
assert.Equal(t, pluginsCore.PhaseInfoUndefined, taskPhase)

// Training operator did not modify the job because it is suspended
pytorchJobSuspended := dummyPytorchJobResourceCreator(kubeflowv1.JobCreated)
pytorchJobSuspended.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
pytorchJobSuspended.Status.StartTime = nil
suspend := true
pytorchJobSuspended.Spec.RunPolicy.Suspend = &suspend
taskPhase, err = pytorchResourceHandler.GetTaskPhase(ctx, taskCtx, pytorchJobSuspended)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
}

func TestGetTaskPhaseIncreasePhaseVersion(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(ctx context.Context, plugi
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
isSuspended := app.Spec.RunPolicy.Suspend != nil && *app.Spec.RunPolicy.Suspend
if !isSuspended && app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow custom resource since creation time %v", app.CreationTimestamp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,25 @@ func TestGetTaskPhase(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseRunning, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

// Training operator did not modify the job even though it is not suspended
tfJob := dummyTensorFlowJobResourceCreator(kubeflowv1.JobCreated)
tfJob.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
tfJob.Status.StartTime = nil
taskPhase, err = tensorflowResourceHandler.GetTaskPhase(ctx, taskCtx, tfJob)
assert.Error(t, err)
assert.Contains(t, err.Error(), "kubeflow operator hasn't updated")
assert.Equal(t, pluginsCore.PhaseInfoUndefined, taskPhase)

// Training operator did not modify the job because it is suspended
tfJobSuspended := dummyTensorFlowJobResourceCreator(kubeflowv1.JobCreated)
tfJobSuspended.CreationTimestamp = v1.Time{Time: time.Now().Add(-time.Hour)}
tfJobSuspended.Status.StartTime = nil
suspend := true
tfJobSuspended.Spec.RunPolicy.Suspend = &suspend
taskPhase, err = tensorflowResourceHandler.GetTaskPhase(ctx, taskCtx, tfJobSuspended)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
}

func TestGetTaskPhaseIncreasePhaseVersion(t *testing.T) {
Expand Down

0 comments on commit ce5a5d8

Please sign in to comment.