Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add workflow concurrency controller #6309

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

thomasjhuang
Copy link

@thomasjhuang thomasjhuang commented Mar 5, 2025

DRAFT

This work is currently in progress, some portion of this work is incomplete and still being tested.

Tracking issue

Flyte backend implementation of #5659.

Why are the changes needed?

This implements a flyte concurrency manager, which allows for granular control of concurrent workflows at a LaunchPlan version level.

What changes were proposed in this pull request?

How was this patch tested?

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

welcome bot commented Mar 5, 2025

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@thomasjhuang thomasjhuang changed the title Add workflow concurrency controller [WIP] Add workflow concurrency controller Mar 5, 2025
@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 5, 2025

Code Review Agent Run #5d27f8

Actionable Suggestions - 7
  • flyteidl/gen/pb-go/flyteidl/admin/schedule.pb.go - 1
  • flyteidl/gen/pb-js/flyteidl.d.ts - 2
  • flyteadmin/concurrency/concurrency_controller.go - 4
    • Missing utility functions for key handling · Line 158-162
    • Missing GetExecutionCounts function implementation · Line 304-305
    • Missing UntrackExecution function implementation · Line 339-339
    • Possible incorrect latency calculation · Line 508-508
Filtered by Review Rules

Bito filtered these suggestions based on rules created automatically for your feedback. Manage rules.

  • flyteadmin/concurrency/executor/workflow_executor.go - 1
    • Add input validation for execution model · Line 46-80
  • flyteadmin/concurrency/concurrency_controller.go - 1
  • flyteadmin/concurrency/informer/launch_plan_informer.go - 1
    • Consider handling initial refresh errors better · Line 59-62
Review Details
  • Files reviewed - 27 · Commit Range: bac25b5..bac25b5
    • flyteadmin/concurrency/concurrency_controller.go
    • flyteadmin/concurrency/config.go
    • flyteadmin/concurrency/core/controller.go
    • flyteadmin/concurrency/core/policies.go
    • flyteadmin/concurrency/executor/workflow_executor.go
    • flyteadmin/concurrency/informer/launch_plan_informer.go
    • flyteadmin/concurrency/repositories/gorm/concurrency_repo.go
    • flyteadmin/concurrency/repositories/interfaces/repository.go
    • flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts
    • flyteidl/gen/pb-es/flyteidl/admin/schedule_pb.ts
    • flyteidl/gen/pb-es/flyteidl/core/execution_pb.ts
    • flyteidl/gen/pb-go/flyteidl/admin/execution.pb.go
    • flyteidl/gen/pb-go/flyteidl/admin/schedule.pb.go
    • flyteidl/gen/pb-go/flyteidl/core/execution.pb.go
    • flyteidl/gen/pb-js/flyteidl.d.ts
    • flyteidl/gen/pb-js/flyteidl.js
    • flyteidl/gen/pb_python/flyteidl/admin/execution_pb2.py
    • flyteidl/gen/pb_python/flyteidl/admin/execution_pb2.pyi
    • flyteidl/gen/pb_python/flyteidl/admin/schedule_pb2.py
    • flyteidl/gen/pb_python/flyteidl/admin/schedule_pb2.pyi
    • flyteidl/gen/pb_python/flyteidl/core/execution_pb2.py
    • flyteidl/gen/pb_python/flyteidl/core/execution_pb2.pyi
    • flyteidl/gen/pb_rust/flyteidl.admin.rs
    • flyteidl/gen/pb_rust/flyteidl.core.rs
    • flyteidl/protos/flyteidl/admin/execution.proto
    • flyteidl/protos/flyteidl/admin/schedule.proto
    • flyteidl/protos/flyteidl/core/execution.proto
  • Files skipped - 2
    • flyteidl/clients/go/assets/admin.swagger.json - Reason: Filter setting
    • flyteidl/gen/pb-go/gateway/flyteidl/service/admin.swagger.json - Reason: Filter setting
  • Tools
    • Golangci-lint (Linter) - ✖︎ Failed
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Collaborator

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - New Feature - Concurrency Controller Implementation

concurrency_controller.go - Introduces a comprehensive concurrency controller with methods to initialize, manage, and process workflow executions based on defined concurrency policies.

config.go - Adds new configuration settings to enable and customize the concurrency controller behavior.

controller.go - Defines core interfaces and methods essential for controlling execution concurrency, ensuring smooth operation under load.

policies.go - Implements concurrency policy definitions and a default policy evaluator to enforce rules such as WAIT, ABORT, and REPLACE.

workflow_executor.go - Creates a new workflow executor that integrates with the concurrency controller to handle execution creation and termination according to concurrency limits.

launch_plan_informer.go - Adds a new informer for caching and refreshing launch plan concurrency data periodically.

concurrency_repo.go - Implements a GORM-based repository for managing concurrency data including execution counts and policies.

repository.go - Defines the interface for concurrency repository, ensuring consistency across implementations.

New Feature - New Feature - Protocol Buffers Enhancements

schedule_pb2.py - Updated descriptor and serialization details; added scheduler policy serialization boundaries.

schedule_pb2.pyi - Enhanced type annotations and initialization signatures; introduced scheduler policy fields and extended enum definitions.

execution_pb2.py - Revised serialization endpoints and descriptor boundaries for workflow execution, accommodating new fields.

execution_pb2.pyi - Extended type signatures through slot updates and inclusion of the PENDING phase in execution enums.

flyteidl.admin.rs - Included a new description field for pending phase and added scheduler_policy to the Schedule struct.

flyteidl.core.rs - Incorporated a new Pending enum case and updated phase mappings to improve execution state representation.

execution.proto - Performed minor reordering of fields while maintaining execution metadata, with one hunk introducing a description field for execution state change details.

schedule.proto - Introduced scheduler_policy along with new enums, restructuring message definitions to support enhanced concurrency and scheduling information.

execution.proto - Added a new PENDING phase to the enum to clearly represent pending executions.

if x != nil {
return x.Max
}
return 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating Max concurrency value

The SchedulerPolicy struct is being added with a Max field for concurrency control, but there's no validation to ensure the Max value is greater than zero. Consider adding validation to prevent setting Max to zero, which would effectively block all executions.

Code suggestion
Check the AI-generated fix before applying
Suggested change
return 0
return 1 // Default to 1 to allow at least one execution to run

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +14905 to +14906
/** ExecutionStateChangeDetails description. */
public description: string;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Property type inconsistent with interface definition

The added description property is defined as a non-optional string, but in the interface definition at line 14884, it's defined as an optional property (description?: (string|null)). Consider making the implementation consistent with the interface by changing public description: string; to public description?: string; or public description: string | null; to match the interface definition.

Code suggestion
Check the AI-generated fix before applying
Suggested change
/** ExecutionStateChangeDetails description. */
public description: string;
/** ExecutionStateChangeDetails description. */
public description: string | null;

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +14905 to +14906
/** ExecutionStateChangeDetails description. */
public description: string;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent optional vs required field definition

The PR adds a new description field to the ExecutionStateChangeDetails interface. This looks good, but I noticed that the implementation in the class (line 14905-14906) declares it as a required field (public description: string), while the interface defines it as optional (description?: (string|null)). Consider making these consistent to avoid potential type errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
/** ExecutionStateChangeDetails description. */
public description: string;
/** ExecutionStateChangeDetails description. */
public description?: string;

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +158 to +162
lpKey := getLaunchPlanKey(lp.Project, lp.Domain, lp.Name)
executionMap := make(map[string]bool)

for _, exec := range executions {
executionKey := getExecutionKey(exec.Project, exec.Domain, exec.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing utility functions for key handling

The code calls getLaunchPlanKey and getExecutionKey functions, and splitExecutionKey on line 267, but these functions aren't defined in the provided code. We need to ensure these utility functions exist and work correctly.

Code suggestion
Check the AI-generated fix before applying
 @@ -369,0 +370,25 @@
 +
 +// getLaunchPlanKey returns a string key for a launch plan
 +func getLaunchPlanKey(project, domain, name string) string {
 +	return fmt.Sprintf("%s/%s/%s", project, domain, name)
 +}
 +
 +// getExecutionKey returns a string key for an execution
 +func getExecutionKey(project, domain, name string) string {
 +	return fmt.Sprintf("%s/%s/%s", project, domain, name)
 +}
 +
 +// splitExecutionKey splits an execution key into project, domain, and name
 +func splitExecutionKey(key string) []string {
 +	return strings.Split(key, "/")
 +}

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Comment on lines +304 to +305
// Get current running executions count
runningCount, err := c.GetExecutionCounts(ctx, launchPlanID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing GetExecutionCounts function implementation

The code calls c.GetExecutionCounts(ctx, launchPlanID) to get the current running executions count, but this function isn't defined in the provided code. We need to ensure this function exists and returns the correct count of running executions.

Code suggestion
Check the AI-generated fix before applying
 @@ -369,0 +370,19 @@
 +
 +// GetExecutionCounts returns the count of running executions for a launch plan
 +func (c *ConcurrencyController) GetExecutionCounts(ctx context.Context, launchPlanID idlCore.Identifier) (int, error) {
 +	lpKey := getLaunchPlanKey(launchPlanID.Project, launchPlanID.Domain, launchPlanID.Name)
 +
 +	c.runningExecutionsMutex.RLock()
 +	defer c.runningExecutionsMutex.RUnlock()
 +
 +	executions, ok := c.runningExecutions[lpKey]
 +	if !ok {
 +		return 0, nil
 +	}
 +
 +	return len(executions), nil
 +}

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

c.metrics.executionsAbortedCount.Inc()

// Untrack the aborted execution
c.UntrackExecution(ctx, oldestExecID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing UntrackExecution function implementation

The code calls c.UntrackExecution(ctx, oldestExecID) to untrack an aborted execution, but this function isn't defined in the provided code. We need to ensure this function exists and correctly removes the execution from the tracking map.

Code suggestion
Check the AI-generated fix before applying
 @@ -369,0 +370,19 @@
 +
 +// UntrackExecution removes an execution from the tracking map
 +func (c *ConcurrencyController) UntrackExecution(ctx context.Context, executionID idlCore.WorkflowExecutionIdentifier) {
 +	executionKey := getExecutionKey(executionID.Project, executionID.Domain, executionID.Name)
 +
 +	c.runningExecutionsMutex.Lock()
 +	defer c.runningExecutionsMutex.Unlock()
 +
 +	// Find the launch plan for this execution
 +	for lpKey, executions := range c.runningExecutions {
 +		if _, ok := executions[executionKey]; ok {
 +			// Remove the execution from the map
 +			delete(c.runningExecutions[lpKey], executionKey)
 +			
 +			// If no more executions for this launch plan, remove the launch plan entry
 +			if len(c.runningExecutions[lpKey]) == 0 {
 +				delete(c.runningExecutions, lpKey)
 +			}
 +			break
 +		}
 +	}
 +}

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

func (c *ConcurrencyController) RecordLatency(operation string, startTime time.Time) {
switch operation {
case "processing":
c.metrics.processingLatency.Observe(startTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible incorrect latency calculation

The RecordLatency method is using startTime directly with Observe(), but typically latency metrics require calculating the duration between the start time and now. Consider checking if the Observe method expects a duration rather than a timestamp.

Code suggestion
Check the AI-generated fix before applying
Suggested change
c.metrics.processingLatency.Observe(startTime)
c.metrics.processingLatency.Observe(time.Since(startTime).Seconds())

Code Review Run #5d27f8


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants