From a4572b702f2bea0de9777147ed4c2f956f748428 Mon Sep 17 00:00:00 2001 From: Hao Hao Date: Fri, 21 Jun 2024 11:40:14 -0700 Subject: [PATCH] chore: refactor sync logic in ControllerRollout to be more clear Signed-off-by: Hao Hao --- .../numaflowcontrollerrollout_controller.go | 89 +++++++++++-------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/internal/controller/numaflowcontrollerrollout_controller.go b/internal/controller/numaflowcontrollerrollout_controller.go index dbaa462e..e0a1b290 100644 --- a/internal/controller/numaflowcontrollerrollout_controller.go +++ b/internal/controller/numaflowcontrollerrollout_controller.go @@ -238,42 +238,9 @@ func (r *NumaflowControllerRolloutReconciler) sync( } numaLogger.Debugf("found %d target objects associated with Numaflow Controller version %s; versions defined:%+v", len(targetObjs), version, definition) - var infoProvider kubeUtil.ResourceInfoProvider - clusterCache, err := r.stateCache.GetClusterCache() - infoProvider = clusterCache - if err != nil { - return gitopsSyncCommon.OperationError, err - } - liveObjByKey, err := r.stateCache.GetManagedLiveObjs(rollout.Name, rollout.Namespace, targetObjs) + reconciliationResult, diffResults, err := r.compareState(rollout, namespace, targetObjs, numaLogger) if err != nil { - return gitopsSyncCommon.OperationError, err - } - reconciliationResult := gitopsSync.Reconcile(targetObjs, liveObjByKey, namespace, infoProvider) - - // Ignore `status` field for all comparison. - // TODO: make it configurable - overrides := map[string]sync.ResourceOverride{ - "*/*": { - IgnoreDifferences: sync.OverrideIgnoreDiff{JSONPointers: []string{"/status"}}}, - } - - resourceOps, cleanup, err := r.getResourceOperations() - if err != nil { - return gitopsSyncCommon.OperationError, err - } - defer cleanup() - - diffOpts := []diff.Option{ - diff.WithLogr(*numaLogger.LogrLogger), - diff.WithServerSideDiff(true), - diff.WithServerSideDryRunner(diff.NewK8sServerSideDryRunner(resourceOps)), - diff.WithManager(common.SSAManager), - diff.WithGVKParser(clusterCache.GetGVKParser()), - } - - diffResults, err := sync.StateDiffs(reconciliationResult.Target, reconciliationResult.Live, overrides, diffOpts) - if err != nil { - numaLogger.Error(err, "Error on comparing git sync state") + numaLogger.Error(err, "Error on comparing live state") return gitopsSyncCommon.OperationError, err } @@ -288,9 +255,10 @@ func (r *NumaflowControllerRolloutReconciler) sync( gitopsSync.WithServerSideApplyManager(common.SSAManager), } + clusterCache, err := r.stateCache.GetClusterCache() if err != nil { numaLogger.Error(err, "Error on getting the cluster cache") - //return gitopsSyncCommon.OperationError, err.Error() + return gitopsSyncCommon.OperationError, err } openAPISchema := clusterCache.GetOpenAPISchema() @@ -316,6 +284,55 @@ func (r *NumaflowControllerRolloutReconciler) sync( return phase, nil } +// compareState compares with desired state of the objects with the live state in the cluster +// for the target objects. +func (r *NumaflowControllerRolloutReconciler) compareState( + rollout *apiv1.NumaflowControllerRollout, + namespace string, + targetObjs []*unstructured.Unstructured, + numaLogger *logger.NumaLogger, +) (gitopsSync.ReconciliationResult, *diff.DiffResultList, error) { + var infoProvider kubeUtil.ResourceInfoProvider + clusterCache, err := r.stateCache.GetClusterCache() + if err != nil { + return gitopsSync.ReconciliationResult{}, nil, err + } + infoProvider = clusterCache + liveObjByKey, err := r.stateCache.GetManagedLiveObjs(rollout.Name, namespace, targetObjs) + if err != nil { + return gitopsSync.ReconciliationResult{}, nil, err + } + reconciliationResult := gitopsSync.Reconcile(targetObjs, liveObjByKey, namespace, infoProvider) + + // Ignore `status` field for all comparison. + // TODO: make it configurable + overrides := map[string]sync.ResourceOverride{ + "*/*": { + IgnoreDifferences: sync.OverrideIgnoreDiff{JSONPointers: []string{"/status"}}}, + } + + resourceOps, cleanup, err := r.getResourceOperations() + if err != nil { + return gitopsSync.ReconciliationResult{}, nil, err + } + defer cleanup() + + diffOpts := []diff.Option{ + diff.WithLogr(*numaLogger.LogrLogger), + diff.WithServerSideDiff(true), + diff.WithServerSideDryRunner(diff.NewK8sServerSideDryRunner(resourceOps)), + diff.WithManager(common.SSAManager), + diff.WithGVKParser(clusterCache.GetGVKParser()), + } + + diffResults, err := sync.StateDiffs(reconciliationResult.Target, reconciliationResult.Live, overrides, diffOpts) + if err != nil { + return reconciliationResult, nil, err + } + + return reconciliationResult, diffResults, nil +} + // getResourceOperations will return the kubectl implementation of the ResourceOperations // interface that provides functionality to manage kubernetes resources. Returns a // cleanup function that must be called to remove the generated kube config for this