From ff307b48733039cf71a3b1133177611e3c5da903 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Thu, 20 Feb 2025 22:36:06 -0800 Subject: [PATCH] . Signed-off-by: Derek Wang --- .../numaflow/v1alpha1/mono_vertex_types.go | 2 +- pkg/apis/numaflow/v1alpha1/vertex_types.go | 54 ++++++++++++++----- .../numaflow/v1alpha1/vertex_types_test.go | 26 ++++++--- pkg/reconciler/pipeline/controller.go | 50 +++++------------ pkg/reconciler/pipeline/controller_test.go | 2 +- pkg/reconciler/vertex/controller.go | 14 ++--- pkg/reconciler/vertex/scaling/scaling.go | 3 ++ test/fixtures/util.go | 2 +- 8 files changed, 81 insertions(+), 72 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index b40494d795..8d96c948f7 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -318,7 +318,7 @@ func (mv MonoVertex) simpleCopy() MonoVertex { func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) { copiedSpec := mv.simpleCopy() - copiedSpec.Spec.Scale = Scale{LookbackSeconds: mv.Spec.Scale.LookbackSeconds} + copiedSpec.Spec.Scale = Scale{LookbackSeconds: ptr.To[uint32](uint32(mv.Spec.Scale.GetLookbackSeconds()))} monoVtxBytes, err := json.Marshal(copiedSpec) if err != nil { return nil, errors.New("failed to marshal mono vertex spec") diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index 0821f8d0c8..8f7ab4ed27 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -211,14 +211,36 @@ func (v Vertex) sidecarEnvs() []corev1.EnvVar { } } -func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { - vertexCopy := &Vertex{ +func (v Vertex) simpleCopy() Vertex { + m := Vertex{ ObjectMeta: metav1.ObjectMeta{ Namespace: v.Namespace, Name: v.Name, }, - Spec: v.Spec.DeepCopyWithoutReplicas(), + Spec: v.Spec.DeepCopyWithoutReplicasAndLifecycle(), + } + if m.Spec.Limits == nil { + m.Spec.Limits = &VertexLimits{} + } + if m.Spec.Limits.ReadBatchSize == nil { + m.Spec.Limits.ReadBatchSize = ptr.To[uint64](DefaultReadBatchSize) + } + if m.Spec.Limits.ReadTimeout == nil { + m.Spec.Limits.ReadTimeout = &metav1.Duration{Duration: DefaultReadTimeout} } + if m.Spec.Limits.BufferMaxLength == nil { + m.Spec.Limits.BufferMaxLength = ptr.To[uint64](DefaultBufferLength) + } + if m.Spec.Limits.BufferUsageLimit == nil { + m.Spec.Limits.BufferUsageLimit = ptr.To[uint32](100 * DefaultBufferUsageLimit) + } + m.Spec.UpdateStrategy = UpdateStrategy{} + return m +} + +func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { + vertexCopy := v.simpleCopy() + v.Spec.Scale = Scale{LookbackSeconds: ptr.To[uint32](uint32(v.Spec.Scale.GetLookbackSeconds()))} vertexBytes, err := json.Marshal(vertexCopy) if err != nil { return nil, errors.New("failed to marshal vertex spec") @@ -424,9 +446,10 @@ func (v Vertex) getInitContainers(req GetVertexPodSpecReq) []corev1.Container { return append(initContainers, v.Spec.InitContainers...) } -func (vs VertexSpec) DeepCopyWithoutReplicas() VertexSpec { +func (vs VertexSpec) DeepCopyWithoutReplicasAndLifecycle() VertexSpec { x := *vs.DeepCopy() x.Replicas = ptr.To[int32](0) + x.Lifecycle = VertexLifecycle{} return x } @@ -474,19 +497,22 @@ func (v Vertex) GetToBuffers() []string { return r } -func (v Vertex) GetReplicas() int { +func (v Vertex) getReplicas() int { if v.IsReduceUDF() { - // Replicas will be 0 only when pausing a pipeline - if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 { - return 0 - } - // Replica of a reduce vertex is determined by the partitions. return v.GetPartitionCount() } - desiredReplicas := 1 - if x := v.Spec.Replicas; x != nil { - desiredReplicas = int(*x) + if v.Spec.Replicas == nil { + return 1 + } + return int(*v.Spec.Replicas) +} + +func (v Vertex) CalculateReplicas() int { + // If we are pausing the Pipeline/Vertex then we should have the desired replicas as 0 + if v.Spec.Lifecycle.GetDesiredPhase() == VertexPhasePaused { + return 0 } + desiredReplicas := v.getReplicas() // Don't allow replicas to be out of the range of min and max when auto scaling is enabled if s := v.Spec.Scale; !s.Disabled { max := int(s.GetMaxReplicas()) @@ -767,7 +793,7 @@ func (vs *VertexStatus) InitConditions() { // IsHealthy indicates whether the vertex is healthy or not func (vs *VertexStatus) IsHealthy() bool { - if vs.Phase != VertexPhaseRunning { + if vs.Phase != VertexPhaseRunning && vs.Phase != VertexPhasePaused { return false } return vs.IsReady() diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go index 1e838a0a81..7a063c31a3 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go @@ -121,8 +121,13 @@ func TestGetToBuffersSink(t *testing.T) { func TestWithoutReplicas(t *testing.T) { s := &VertexSpec{ Replicas: ptr.To[int32](3), + Lifecycle: VertexLifecycle{ + DesiredPhase: VertexPhasePaused, + }, } - assert.Equal(t, int32(0), *s.DeepCopyWithoutReplicas().Replicas) + dc := s.DeepCopyWithoutReplicasAndLifecycle() + assert.Equal(t, int32(0), *dc.Replicas) + assert.Equal(t, VertexLifecycle{}, dc.Lifecycle) } func TestGetVertexReplicas(t *testing.T) { @@ -133,11 +138,16 @@ func TestGetVertexReplicas(t *testing.T) { }, }, } - assert.Equal(t, 1, v.GetReplicas()) + v.Spec.Lifecycle.DesiredPhase = VertexPhasePaused + assert.Equal(t, 0, v.CalculateReplicas()) + v.Spec.Lifecycle.DesiredPhase = VertexPhaseRunning + assert.Equal(t, 1, v.CalculateReplicas()) + v.Spec.Lifecycle = VertexLifecycle{} + assert.Equal(t, 1, v.CalculateReplicas()) v.Spec.Replicas = ptr.To[int32](3) - assert.Equal(t, 3, v.GetReplicas()) + assert.Equal(t, 3, v.CalculateReplicas()) v.Spec.Replicas = ptr.To[int32](0) - assert.Equal(t, 0, v.GetReplicas()) + assert.Equal(t, 0, v.CalculateReplicas()) v.Spec.UDF = &UDF{ GroupBy: &GroupBy{}, } @@ -145,16 +155,16 @@ func TestGetVertexReplicas(t *testing.T) { {Edge: Edge{From: "a", To: "b"}}, } v.Spec.Replicas = ptr.To[int32](5) - assert.Equal(t, 1, v.GetReplicas()) + assert.Equal(t, 1, v.CalculateReplicas()) v.Spec.Replicas = ptr.To[int32](1000) - assert.Equal(t, 1, v.GetReplicas()) + assert.Equal(t, 1, v.CalculateReplicas()) v.Spec.UDF.GroupBy = nil v.Spec.Scale.Max = ptr.To[int32](40) v.Spec.Scale.Min = ptr.To[int32](20) v.Spec.Replicas = ptr.To[int32](300) - assert.Equal(t, 40, v.GetReplicas()) + assert.Equal(t, 40, v.CalculateReplicas()) v.Spec.Replicas = ptr.To[int32](10) - assert.Equal(t, 20, v.GetReplicas()) + assert.Equal(t, 20, v.CalculateReplicas()) } func TestGetHeadlessSvcSpec(t *testing.T) { diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index c22b5702e0..27571834a8 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -667,8 +667,11 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { ToEdges: toEdges, Watermark: pl.Spec.Watermark, Replicas: &replicas, + Lifecycle: dfv1.VertexLifecycle{ + DesiredPhase: dfv1.VertexPhase(pl.GetDesiredPhase()), + }, } - hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicas()) + hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicasAndLifecycle()) obj := dfv1.Vertex{ ObjectMeta: metav1.ObjectMeta{ Namespace: pl.Namespace, @@ -854,7 +857,7 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli } } } - _, err := r.scaleUpAllVertices(ctx, pl) + _, err := r.updateVerticeDesiredPhase(ctx, pl, allVertexFilter, dfv1.VertexPhaseRunning) if err != nil { return false, err } @@ -874,7 +877,7 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin pl.Status.MarkPhasePausing() if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" { - _, err := r.scaleDownSourceVertices(ctx, pl) + _, err := r.updateVerticeDesiredPhase(ctx, pl, sourceVertexFilter, dfv1.VertexPhasePaused) if err != nil { // If there's an error requeue the request return true, err @@ -915,7 +918,7 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin // if drain is completed, or we have exceeded the pause deadline, mark pl as paused and scale down if time.Now().After(pauseTimestamp.Add(time.Duration(pl.GetPauseGracePeriodSeconds())*time.Second)) || drainCompleted { - _, err = r.scaleDownAllVertices(ctx, pl) + _, err = r.updateVerticeDesiredPhase(ctx, pl, allVertexFilter, dfv1.VertexPhasePaused) if err != nil { return true, err } @@ -945,19 +948,7 @@ func (r *pipelineReconciler) noSourceVertexPodsRunning(ctx context.Context, pl * return len(pods.Items) == 0, nil } -func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { - return r.scaleVertex(ctx, pl, sourceVertexFilter, 0) -} - -func (r *pipelineReconciler) scaleDownAllVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { - return r.scaleVertex(ctx, pl, allVertexFilter, 0) -} - -func (r *pipelineReconciler) scaleUpAllVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { - return r.scaleVertex(ctx, pl, allVertexFilter, 1) -} - -func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, filter vertexFilterFunc, replicas int32) (bool, error) { +func (r *pipelineReconciler) updateVerticeDesiredPhase(ctx context.Context, pl *dfv1.Pipeline, filter vertexFilterFunc, desiredPhase dfv1.VertexPhase) (bool, error) { log := logging.FromContext(ctx) existingVertices, err := r.findExistingVertices(ctx, pl) if err != nil { @@ -965,29 +956,14 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, } isVertexPatched := false for _, vertex := range existingVertices { - if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) { - scaleTo := replicas - // if replicas equals to 1, it means we are resuming a paused pipeline - // in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration: - // for a reducer, we scale up to the partition count - // for a non-reducer, if min is set, we scale up to min - if replicas == 1 { - if vertex.IsReduceUDF() { - scaleTo = int32(vertex.GetPartitionCount()) - } else { - scaleTo = vertex.Spec.Scale.GetMinReplicas() - if scaleTo < 1 { - scaleTo = 1 - } - } - } - patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo) + if originPhase := vertex.Spec.Lifecycle.GetDesiredPhase(); filter(vertex) && originPhase != desiredPhase { + patchJson := fmt.Sprintf(`{"spec":{"lifecycle":{"desiredPhase":"%s"}}}`, desiredPhase) err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))) if err != nil && !apierrors.IsNotFound(err) { return false, err } - log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", scaleTo), zap.String("vertex", vertex.Name)) - r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, scaleTo) + log.Infow("Updated vertex desired phase", zap.String("from", string(originPhase)), zap.String("to", string(desiredPhase)), zap.String("vertex", vertex.Name)) + r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexDesiredPhase", "Updated vertex %q desired phase from %s to %s", vertex.Name, originPhase, desiredPhase) isVertexPatched = true } } @@ -997,7 +973,7 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, func (r *pipelineReconciler) safeToDelete(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { // update the phase to deleting pl.Status.MarkPhaseDeleting() - vertexPatched, err := r.scaleDownSourceVertices(ctx, pl) + vertexPatched, err := r.updateVerticeDesiredPhase(ctx, pl, sourceVertexFilter, dfv1.VertexPhasePaused) if err != nil { return false, err } diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index 565e919e8c..50cb4718ef 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -509,7 +509,7 @@ func Test_pauseAndResumePipeline(t *testing.T) { assert.NoError(t, err) v, err := r.findExistingVertices(ctx, testObj) assert.NoError(t, err) - assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.Equal(t, dfv1.VertexPhasePaused, v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Lifecycle.GetDesiredPhase()) _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) v, err = r.findExistingVertices(ctx, testObj) diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index e015d0d3a0..5a5b28c68f 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -166,8 +166,8 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( vertex.Status.MarkDeployed() - // Mark it running before checking the status of the pods - vertex.Status.MarkPhaseRunning() + // Mark desired phase before checking the status of the pods + vertex.Status.MarkPhase(vertex.Spec.Lifecycle.GetDesiredPhase(), "", "") // Check status of the pods var podList corev1.PodList @@ -177,10 +177,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( return ctrl.Result{}, fmt.Errorf("failed to get pods of a vertex: %w", err) } readyPods := reconciler.NumOfReadyPods(podList) - desiredReplicas := vertex.GetReplicas() - if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused { - desiredReplicas = 0 - } + desiredReplicas := vertex.CalculateReplicas() if readyPods > desiredReplicas { // It might happen in some corner cases, such as during rollout readyPods = desiredReplicas } @@ -197,10 +194,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService) error { log := logging.FromContext(ctx) - desiredReplicas := vertex.GetReplicas() - if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused { - desiredReplicas = 0 - } + desiredReplicas := vertex.CalculateReplicas() vertex.Status.DesiredReplicas = uint32(desiredReplicas) // Set metrics diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 455ce9906b..792f7b0784 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -172,6 +172,9 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err log.Infof("Vertex not in Running phase, skip scaling.") return nil } + if vertex.Spec.Lifecycle.GetDesiredPhase() != dfv1.VertexPhaseRunning { + log.Infof("Vertex desired phase is not Running, skip scaling.") + } if vertex.Status.UpdateHash != vertex.Status.CurrentHash && vertex.Status.UpdateHash != "" { log.Info("Vertex is updating, skip scaling.") return nil diff --git a/test/fixtures/util.go b/test/fixtures/util.go index d2f27dc5f7..7808a031cb 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -308,7 +308,7 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp if err != nil { return fmt.Errorf("error getting vertex pod name: %w", err) } - ok = ok && len(podList.Items) > 0 && len(podList.Items) == vertexList.Items[0].GetReplicas() // pod number should equal to desired replicas + ok = ok && len(podList.Items) > 0 && len(podList.Items) == vertexList.Items[0].CalculateReplicas() // pod number should equal to desired replicas for _, p := range podList.Items { ok = ok && isPodReady(p) }