From ef0cffdc67504ba7a97bbb779c97fdcfd4233056 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Thu, 20 Feb 2025 12:12:51 -0800 Subject: [PATCH] fix(controller): replicas calculation for paused pipeline (#2412) Signed-off-by: Derek Wang --- pkg/reconciler/monovertex/scaling/scaling.go | 5 +++++ pkg/reconciler/pipeline/controller.go | 12 ++++++++---- pkg/reconciler/vertex/controller.go | 9 +++++++-- pkg/reconciler/vertex/scaling/scaling.go | 7 ++++++- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go index 7f9ee9ea84..7614cfd16f 100644 --- a/pkg/reconciler/monovertex/scaling/scaling.go +++ b/pkg/reconciler/monovertex/scaling/scaling.go @@ -181,6 +181,11 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int) return nil } + if monoVtx.Spec.Scale.GetMaxReplicas() == monoVtx.Spec.Scale.GetMinReplicas() { + log.Infof("MonoVertex %s has same scale.min and scale.max, skip scaling.", monoVtx.Name) + return nil + } + if monoVtx.Status.Replicas == 0 { // Was scaled to 0 // Periodically wake them up from 0 replicas to 1, to peek for the incoming messages if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) { diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 513f074c0a..c22b5702e0 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -365,6 +365,10 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df // Keep the original replicas as much as possible if originalReplicas >= newObj.Spec.Scale.GetMinReplicas() && originalReplicas <= newObj.Spec.Scale.GetMaxReplicas() { oldObj.Spec.Replicas = &originalReplicas + } else if originalReplicas < newObj.Spec.Scale.GetMinReplicas() { + originalReplicas = newObj.Spec.Scale.GetMinReplicas() + } else { + originalReplicas = newObj.Spec.Scale.GetMaxReplicas() } oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash] if err := r.client.Update(ctx, &oldObj); err != nil { @@ -647,11 +651,11 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { replicas = int32(partitions) } else { x := vCopy.Scale - if x.Min != nil && *x.Min > 1 && replicas < *x.Min { - replicas = *x.Min + if replicas < x.GetMinReplicas() { + replicas = x.GetMinReplicas() } - if x.Max != nil && *x.Max > 1 && replicas > *x.Max { - replicas = *x.Max + if replicas > x.GetMaxReplicas() { + replicas = x.GetMaxReplicas() } } diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index fa1c8f3180..e015d0d3a0 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -104,8 +104,6 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( vertex.Status.InitConditions() vertex.Status.SetObservedGeneration(vertex.Generation) - desiredReplicas := vertex.GetReplicas() - isbSvc := &dfv1.InterStepBufferService{} isbSvcName := dfv1.DefaultISBSvcName if len(vertex.Spec.InterStepBufferServiceName) > 0 { @@ -179,6 +177,10 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( return ctrl.Result{}, fmt.Errorf("failed to get pods of a vertex: %w", err) } readyPods := reconciler.NumOfReadyPods(podList) + desiredReplicas := vertex.GetReplicas() + if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused { + desiredReplicas = 0 + } if readyPods > desiredReplicas { // It might happen in some corner cases, such as during rollout readyPods = desiredReplicas } @@ -196,6 +198,9 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService) error { log := logging.FromContext(ctx) desiredReplicas := vertex.GetReplicas() + if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused { + desiredReplicas = 0 + } vertex.Status.DesiredReplicas = uint32(desiredReplicas) // Set metrics diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index c1fb96eb47..455ce9906b 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -202,11 +202,16 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err log.Info("Corresponding Pipeline not in Running state, skip scaling.") return nil } - if int(vertex.Status.Replicas) != vertex.GetReplicas() { + if vertex.Status.Replicas != vertex.Status.DesiredReplicas { log.Infof("Vertex %s might be under processing, replicas mismatch, skip scaling.", vertex.Name) return nil } + if vertex.Spec.Scale.GetMaxReplicas() == vertex.Spec.Scale.GetMinReplicas() { + log.Infof("Vertex %s has same scale.min and scale.max, skip scaling.", vertex.Name) + return nil + } + var err error daemonClient, _ := s.daemonClientsCache.Get(pl.GetDaemonServiceURL()) if daemonClient == nil {