Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyteadmin] add delete execution phase api #6267

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,3 +1959,37 @@
}
return filters, nil
}
func (m *ExecutionManager) DeleteExecutionPhase(ctx context.Context, req *admin.ExecutionPhaseDeleteRequest) (*admin.ExecutionPhaseDeleteResponse, error) {
executionPhase := req.GetPhase()
if executionPhase == core.WorkflowExecution_UNDEFINED {
return nil, fmt.Errorf("execution phase cannot be undefined")
}

Check warning on line 1966 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L1965-L1966

Added lines #L1965 - L1966 were not covered by tests
Comment on lines +1964 to +1966
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more thorough phase validation

Consider adding more specific validation for the execution phase. The current check only validates against UNDEFINED but there may be other invalid phases. Consider validating against a list of allowed phases.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if executionPhase == core.WorkflowExecution_UNDEFINED {
return nil, fmt.Errorf("execution phase cannot be undefined")
}
validPhases := []core.WorkflowExecution_Phase{
core.WorkflowExecution_QUEUED,
core.WorkflowExecution_RUNNING,
core.WorkflowExecution_SUCCEEDING,
core.WorkflowExecution_SUCCEEDED,
core.WorkflowExecution_FAILING,
core.WorkflowExecution_FAILED,
core.WorkflowExecution_ABORTED,
core.WorkflowExecution_TIMED_OUT,
core.WorkflowExecution_ABORTING,
}
isValid := false
for _, phase := range validPhases {
if executionPhase == phase {
isValid = true
break
}
}
if !isValid {
return nil, fmt.Errorf("invalid execution phase: %s", executionPhase)
}

Code Review Run #26e38f


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


workflowExecutionID := req.GetId()
if workflowExecutionID == nil {
return nil, fmt.Errorf("workflow execution identifier cannot be nil")
}

Check warning on line 1971 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L1970-L1971

Added lines #L1970 - L1971 were not covered by tests

if workflowExecutionID.GetProject() == "" || workflowExecutionID.GetDomain() == "" {
return nil, fmt.Errorf("workflow execution identifier must have project, domain")
}

// Wrap executionPhase in ExecutionPhaseDeleteInput
input := repositoryInterfaces.ExecutionPhaseDeleteInput{
Project: workflowExecutionID.GetProject(),
Domain: workflowExecutionID.GetDomain(),
ExecutionPhase: executionPhase,
}

err := m.db.ExecutionRepo().Delete(ctx, input)
if err != nil {
return nil, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding context to error return

Consider adding error wrapping with context when returning the database error. This would help with debugging by providing more context about what operation failed. Maybe something like: fmt.Errorf("failed to delete execution phase %s: %w", executionPhase, err)

Code suggestion
Check the AI-generated fix before applying
Suggested change
return nil, err
return nil, fmt.Errorf("failed to delete execution phase %s: %w", executionPhase, err)

Code Review Run #50b8fe


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

}

Check warning on line 1987 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L1986-L1987

Added lines #L1986 - L1987 were not covered by tests

return &admin.ExecutionPhaseDeleteResponse{
Message: fmt.Sprintf("Execution phase %s for workflow %s/%s deleted successfully",
executionPhase.String(),
workflowExecutionID.GetProject(),
workflowExecutionID.GetDomain()),
}, nil
}
66 changes: 66 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/mocks"
managerMocks "github.com/flyteorg/flyte/flyteadmin/pkg/manager/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
repositoryMocks "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
Expand Down Expand Up @@ -6029,3 +6030,68 @@ func TestQueryTemplate(t *testing.T) {
assert.Error(t, err)
})
}

func TestDeleteExecutionPhase(t *testing.T) {
t.Run("successful deletion", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()

deleteExecutionPhaseFunc := func(ctx context.Context, input repositoryInterfaces.ExecutionPhaseDeleteInput) error {
assert.Equal(t, "project", input.Project)
assert.Equal(t, "domain", input.Domain)
assert.Equal(t, core.WorkflowExecution_SUCCEEDED, input.ExecutionPhase)
return nil
}

mockExecutionRepo := repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo)
mockExecutionRepo.SetDeleteCallback(deleteExecutionPhaseFunc)

r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(),
getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(),
mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil,
&eventWriterMocks.WorkflowExecutionEventWriter{})

resp, err := execManager.DeleteExecutionPhase(context.Background(), &admin.ExecutionPhaseDeleteRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
},
Phase: core.WorkflowExecution_SUCCEEDED,
})

assert.NoError(t, err)
assert.NotNil(t, resp)
})

t.Run("failed deletion", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()

deleteExecutionPhaseFunc := func(ctx context.Context, input repositoryInterfaces.ExecutionPhaseDeleteInput) error {
assert.Equal(t, "project", input.Project)
assert.Equal(t, core.WorkflowExecution_SUCCEEDED, input.ExecutionPhase)
return nil
}

mockExecutionRepo := repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo)
mockExecutionRepo.SetDeleteCallback(deleteExecutionPhaseFunc)

r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(),
getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(),
mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil,
&eventWriterMocks.WorkflowExecutionEventWriter{})

resp, err := execManager.DeleteExecutionPhase(context.Background(), &admin.ExecutionPhaseDeleteRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
},
Phase: core.WorkflowExecution_SUCCEEDED,
})

assert.Error(t, err)
assert.Nil(t, resp)
assert.Contains(t, err.Error(), "workflow execution identifier must have project, domain")
})
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ type ExecutionInterface interface {
ListExecutions(ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error)
TerminateExecution(
ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error)
DeleteExecutionPhase(ctx context.Context, request *admin.ExecutionPhaseDeleteRequest) (*admin.ExecutionPhaseDeleteResponse, error)
}
59 changes: 59 additions & 0 deletions flyteadmin/pkg/manager/mocks/execution_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flyteadmin/pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,17 @@
metrics: metrics,
}
}

func (r *ExecutionRepo) Delete(ctx context.Context, input interfaces.ExecutionPhaseDeleteInput) error {
result := r.db.Delete(&models.Execution{},
"execution_project = ? AND execution_domain = ? AND phase = ?",
input.Project,
input.Domain,
input.ExecutionPhase.String())

if result.Error != nil {
return result.Error
}

Check warning on line 183 in flyteadmin/pkg/repositories/gormimpl/execution_repo.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/gormimpl/execution_repo.go#L174-L183

Added lines #L174 - L183 were not covered by tests

return nil

Check warning on line 185 in flyteadmin/pkg/repositories/gormimpl/execution_repo.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/gormimpl/execution_repo.go#L185

Added line #L185 was not covered by tests
}
7 changes: 7 additions & 0 deletions flyteadmin/pkg/repositories/interfaces/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package interfaces

import (
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

// Parameters for getting an individual resource.
Expand Down Expand Up @@ -44,3 +45,9 @@ type CountResourceInput struct {
// the count query. This enables filtering on non-primary entity attributes.
JoinTableEntities map[common.Entity]bool
}

type ExecutionPhaseDeleteInput struct {
Project string
Domain string
ExecutionPhase core.WorkflowExecution_Phase
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/interfaces/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ExecutionRepoInterface interface {
List(ctx context.Context, input ListResourceInput) (ExecutionCollectionOutput, error)
// Returns count of executions matching query parameters.
Count(ctx context.Context, input CountResourceInput) (int64, error)
Delete(ctx context.Context, input ExecutionPhaseDeleteInput) error
}

// Response format for a query on workflows.
Expand Down
13 changes: 13 additions & 0 deletions flyteadmin/pkg/repositories/mocks/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type GetExecutionFunc func(ctx context.Context, input interfaces.Identifier) (mo
type ListExecutionFunc func(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error)
type CountExecutionFunc func(ctx context.Context, input interfaces.CountResourceInput) (int64, error)
type DeleteExecutionFunc func(ctx context.Context, input interfaces.ExecutionPhaseDeleteInput) error

type MockExecutionRepo struct {
createFunction CreateExecutionFunc
updateFunction UpdateExecutionFunc
getFunction GetExecutionFunc
listFunction ListExecutionFunc
countFunction CountExecutionFunc
deleteFunction DeleteExecutionFunc
}

func (r *MockExecutionRepo) Create(ctx context.Context, input models.Execution, executionTagModel []*models.ExecutionTag) error {
Expand Down Expand Up @@ -81,3 +83,14 @@ func (r *MockExecutionRepo) SetCountCallback(countFunction CountExecutionFunc) {
func NewMockExecutionRepo() interfaces.ExecutionRepoInterface {
return &MockExecutionRepo{}
}

func (r *MockExecutionRepo) Delete(ctx context.Context, input interfaces.ExecutionPhaseDeleteInput) error {
if r.deleteFunction != nil {
return r.deleteFunction(ctx, input)
}
return nil
}

func (r *MockExecutionRepo) SetDeleteCallback(deleteFunction func(context.Context, interfaces.ExecutionPhaseDeleteInput) error) {
r.deleteFunction = deleteFunction
}
14 changes: 14 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,17 @@
m.Metrics.executionEndpointMetrics.terminate.Success()
return response, nil
}

func (m *AdminService) DeleteExecutionPhase(
ctx context.Context, request *admin.ExecutionPhaseDeleteRequest) (*admin.ExecutionPhaseDeleteResponse, error) {
var response *admin.ExecutionPhaseDeleteResponse
var err error
m.Metrics.executionEndpointMetrics.terminate.Time(func() {
response, err = m.ExecutionManager.DeleteExecutionPhase(ctx, request)
Comment on lines +160 to +161
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using specific metric for DeleteExecutionPhase

Consider using a more specific metric name for DeleteExecutionPhase. Currently using terminate metric which seems to be shared with termination functionality. This could lead to misleading metrics.

Code suggestion
Check the AI-generated fix before applying
 	terminate   util.RequestMetrics
 +	deletePhase util.RequestMetrics
 @@ -160,9 +160,9 @@
 -	m.Metrics.executionEndpointMetrics.terminate.Time(func() {
 -		response, err = m.ExecutionManager.DeleteExecutionPhase(ctx, request)
 -	})
 -	if err != nil {
 -		return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.terminate)
 -	}
 -	m.Metrics.executionEndpointMetrics.terminate.Success()
 +	m.Metrics.executionEndpointMetrics.deletePhase.Time(func() {
 +		response, err = m.ExecutionManager.DeleteExecutionPhase(ctx, request)
 +	})
 +	if err != nil {
 +		return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.deletePhase)
 +	}
 +	m.Metrics.executionEndpointMetrics.deletePhase.Success()

Code Review Run #50b8fe


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +155 to +161
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know where to see the metrics?
is it stored in database or a metric server something like that?

Copy link
Contributor Author

@taieeuu taieeuu Mar 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data is stored in Prometheus, but 'flytectl demo start' does not launch Prometheus.

})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.terminate)
}
m.Metrics.executionEndpointMetrics.terminate.Success()
return response, nil

Check warning on line 167 in flyteadmin/pkg/rpc/adminservice/execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/rpc/adminservice/execution.go#L157-L167

Added lines #L157 - L167 were not covered by tests
}
48 changes: 48 additions & 0 deletions flyteidl/clients/go/admin/mocks/AdminServiceClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading