Skip to content

Commit

Permalink
decision engine
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 committed Sep 18, 2024
1 parent 752c161 commit f6fda53
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 45 deletions.
3 changes: 0 additions & 3 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,4 @@ var (

// default requeue time used by Reconcilers
DefaultDelayedRequeue = ctrl.Result{RequeueAfter: 20 * time.Second}

// DataLossPrevention is a feature flag used to turn on/off the automatic pause feature for pipelines based on how it's set in the Config
//DataLossPrevention bool
)
3 changes: 1 addition & 2 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func (*ConfigManager) GetControllerDefinitionsMgr() *NumaflowControllerDefinitio
type GlobalConfig struct {
LogLevel int `json:"logLevel" mapstructure:"logLevel"`
IncludedResources string `json:"includedResources" mapstructure:"includedResources"`
// Feature flag - if enabled causes pipeline(s) to be paused when pipeline, numaflow controller, or ISB Service gets updated
//DataLossPrevention bool `json:"dataLossPrevention" mapstructure:"dataLossPrevention"`
// If user's config doesn't exist or doesn't specify strategy, this is the default
// TODO: should we put this here or in usde config?
DefaultUpgradeStrategy USDEUserStrategy `json:"defaultUpgradeStrategy" mapstructure:"defaultUpgradeStrategy"`
// List of Numaflow Controller image names to look for
Expand Down
40 changes: 0 additions & 40 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context,
}

// does the Resource need updating, and if so how?
//comparisonExcludedPaths := []string{"lifecycle.desiredPhase"}
pipelineNeedsToUpdate, upgradeStrategyType, err := usde.ResourceNeedsUpdating(ctx, newPipelineDef, existingPipelineDef)
if err != nil {
return err
Expand Down Expand Up @@ -695,16 +694,6 @@ func (r *PipelineRolloutReconciler) needPPND(ctx context.Context, pipelineRollou
return &needPPND, nil
}

/*
func pipelineNeedsUpdating(ctx context.Context, newPipelineDef *kubernetes.GenericObject, existingPipelineDef *kubernetes.GenericObject) (bool, error) {
// Does pipeline spec need to be updated?
pipelineSpecsEqual, err := pipelineSpecNeedsUpdating(ctx, existingPipelineDef, newPipelineDef)
if err != nil {
return false, err
}
return !pipelineSpecsEqual, nil
}*/

// return true if Pipeline (or its children) is still in the process of being reconciled
func pipelineIsUpdating(newPipelineDef *kubernetes.GenericObject, existingPipelineDef *kubernetes.GenericObject) (bool, error) {
existingPipelineStatus, err := kubernetes.ParseStatus(existingPipelineDef)
Expand Down Expand Up @@ -943,35 +932,6 @@ func pipelineWithoutLifecycle(obj *kubernetes.GenericObject) (map[string]interfa
comparisonExcludedPaths := []string{"lifecycle.desiredPhase"}
util.RemovePaths(pipelineAsMap, comparisonExcludedPaths, ".")
return pipelineAsMap, nil
/*unstruc, err := kubernetes.ObjectToUnstructured(obj)
if err != nil {
return nil, err
}
_, found, err := unstructured.NestedString(unstruc.Object, "spec", "lifecycle", "desiredPhase")
if err != nil {
return nil, err
}
if found {
unstrucNew := unstruc.DeepCopy()
specMapAsInterface, found := unstrucNew.Object["spec"]
if found {
specMap, ok := specMapAsInterface.(map[string]interface{})
if ok {
lifecycleMapAsInterface, found := specMap["lifecycle"]
if found {
lifecycleMap, ok := lifecycleMapAsInterface.(map[string]interface{})
if ok {
delete(lifecycleMap, "desiredPhase")
specMap["lifecycle"] = lifecycleMap
return specMap, nil
}
}
}
return nil, fmt.Errorf("failed to clear spec.lifecycle.desiredPhase from object: %+v", unstruc.Object)
}
}
return unstruc.Object["spec"].(map[string]interface{}), nil*/
}

func checkPipelineStatus(ctx context.Context, pipeline *kubernetes.GenericObject, phase numaflowv1.PipelinePhase) bool {
Expand Down

0 comments on commit f6fda53

Please sign in to comment.