Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Feb 23, 2025
1 parent 2ef24ad commit ff307b4
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (mv MonoVertex) simpleCopy() MonoVertex {

func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) {
copiedSpec := mv.simpleCopy()
copiedSpec.Spec.Scale = Scale{LookbackSeconds: mv.Spec.Scale.LookbackSeconds}
copiedSpec.Spec.Scale = Scale{LookbackSeconds: ptr.To[uint32](uint32(mv.Spec.Scale.GetLookbackSeconds()))}
monoVtxBytes, err := json.Marshal(copiedSpec)
if err != nil {
return nil, errors.New("failed to marshal mono vertex spec")
Expand Down
54 changes: 40 additions & 14 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,36 @@ func (v Vertex) sidecarEnvs() []corev1.EnvVar {
}
}

func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
vertexCopy := &Vertex{
func (v Vertex) simpleCopy() Vertex {
m := Vertex{
ObjectMeta: metav1.ObjectMeta{
Namespace: v.Namespace,
Name: v.Name,
},
Spec: v.Spec.DeepCopyWithoutReplicas(),
Spec: v.Spec.DeepCopyWithoutReplicasAndLifecycle(),
}
if m.Spec.Limits == nil {
m.Spec.Limits = &VertexLimits{}
}
if m.Spec.Limits.ReadBatchSize == nil {
m.Spec.Limits.ReadBatchSize = ptr.To[uint64](DefaultReadBatchSize)
}
if m.Spec.Limits.ReadTimeout == nil {
m.Spec.Limits.ReadTimeout = &metav1.Duration{Duration: DefaultReadTimeout}
}
if m.Spec.Limits.BufferMaxLength == nil {
m.Spec.Limits.BufferMaxLength = ptr.To[uint64](DefaultBufferLength)
}
if m.Spec.Limits.BufferUsageLimit == nil {
m.Spec.Limits.BufferUsageLimit = ptr.To[uint32](100 * DefaultBufferUsageLimit)
}
m.Spec.UpdateStrategy = UpdateStrategy{}
return m
}

func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
vertexCopy := v.simpleCopy()
v.Spec.Scale = Scale{LookbackSeconds: ptr.To[uint32](uint32(v.Spec.Scale.GetLookbackSeconds()))}
vertexBytes, err := json.Marshal(vertexCopy)
if err != nil {
return nil, errors.New("failed to marshal vertex spec")
Expand Down Expand Up @@ -424,9 +446,10 @@ func (v Vertex) getInitContainers(req GetVertexPodSpecReq) []corev1.Container {
return append(initContainers, v.Spec.InitContainers...)
}

func (vs VertexSpec) DeepCopyWithoutReplicas() VertexSpec {
func (vs VertexSpec) DeepCopyWithoutReplicasAndLifecycle() VertexSpec {
x := *vs.DeepCopy()
x.Replicas = ptr.To[int32](0)
x.Lifecycle = VertexLifecycle{}
return x
}

Expand Down Expand Up @@ -474,19 +497,22 @@ func (v Vertex) GetToBuffers() []string {
return r
}

func (v Vertex) GetReplicas() int {
func (v Vertex) getReplicas() int {
if v.IsReduceUDF() {
// Replicas will be 0 only when pausing a pipeline
if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 {
return 0
}
// Replica of a reduce vertex is determined by the partitions.
return v.GetPartitionCount()
}
desiredReplicas := 1
if x := v.Spec.Replicas; x != nil {
desiredReplicas = int(*x)
if v.Spec.Replicas == nil {
return 1
}
return int(*v.Spec.Replicas)
}

func (v Vertex) CalculateReplicas() int {
// If we are pausing the Pipeline/Vertex then we should have the desired replicas as 0
if v.Spec.Lifecycle.GetDesiredPhase() == VertexPhasePaused {
return 0
}
desiredReplicas := v.getReplicas()
// Don't allow replicas to be out of the range of min and max when auto scaling is enabled
if s := v.Spec.Scale; !s.Disabled {
max := int(s.GetMaxReplicas())
Expand Down Expand Up @@ -767,7 +793,7 @@ func (vs *VertexStatus) InitConditions() {

// IsHealthy indicates whether the vertex is healthy or not
func (vs *VertexStatus) IsHealthy() bool {
if vs.Phase != VertexPhaseRunning {
if vs.Phase != VertexPhaseRunning && vs.Phase != VertexPhasePaused {
return false
}
return vs.IsReady()
Expand Down
26 changes: 18 additions & 8 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ func TestGetToBuffersSink(t *testing.T) {
func TestWithoutReplicas(t *testing.T) {
s := &VertexSpec{
Replicas: ptr.To[int32](3),
Lifecycle: VertexLifecycle{
DesiredPhase: VertexPhasePaused,
},
}
assert.Equal(t, int32(0), *s.DeepCopyWithoutReplicas().Replicas)
dc := s.DeepCopyWithoutReplicasAndLifecycle()
assert.Equal(t, int32(0), *dc.Replicas)
assert.Equal(t, VertexLifecycle{}, dc.Lifecycle)
}

func TestGetVertexReplicas(t *testing.T) {
Expand All @@ -133,28 +138,33 @@ func TestGetVertexReplicas(t *testing.T) {
},
},
}
assert.Equal(t, 1, v.GetReplicas())
v.Spec.Lifecycle.DesiredPhase = VertexPhasePaused
assert.Equal(t, 0, v.CalculateReplicas())
v.Spec.Lifecycle.DesiredPhase = VertexPhaseRunning
assert.Equal(t, 1, v.CalculateReplicas())
v.Spec.Lifecycle = VertexLifecycle{}
assert.Equal(t, 1, v.CalculateReplicas())
v.Spec.Replicas = ptr.To[int32](3)
assert.Equal(t, 3, v.GetReplicas())
assert.Equal(t, 3, v.CalculateReplicas())
v.Spec.Replicas = ptr.To[int32](0)
assert.Equal(t, 0, v.GetReplicas())
assert.Equal(t, 0, v.CalculateReplicas())
v.Spec.UDF = &UDF{
GroupBy: &GroupBy{},
}
v.Spec.FromEdges = []CombinedEdge{
{Edge: Edge{From: "a", To: "b"}},
}
v.Spec.Replicas = ptr.To[int32](5)
assert.Equal(t, 1, v.GetReplicas())
assert.Equal(t, 1, v.CalculateReplicas())
v.Spec.Replicas = ptr.To[int32](1000)
assert.Equal(t, 1, v.GetReplicas())
assert.Equal(t, 1, v.CalculateReplicas())
v.Spec.UDF.GroupBy = nil
v.Spec.Scale.Max = ptr.To[int32](40)
v.Spec.Scale.Min = ptr.To[int32](20)
v.Spec.Replicas = ptr.To[int32](300)
assert.Equal(t, 40, v.GetReplicas())
assert.Equal(t, 40, v.CalculateReplicas())
v.Spec.Replicas = ptr.To[int32](10)
assert.Equal(t, 20, v.GetReplicas())
assert.Equal(t, 20, v.CalculateReplicas())
}

func TestGetHeadlessSvcSpec(t *testing.T) {
Expand Down
50 changes: 13 additions & 37 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,11 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
ToEdges: toEdges,
Watermark: pl.Spec.Watermark,
Replicas: &replicas,
Lifecycle: dfv1.VertexLifecycle{
DesiredPhase: dfv1.VertexPhase(pl.GetDesiredPhase()),
},
}
hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicas())
hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicasAndLifecycle())
obj := dfv1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Namespace: pl.Namespace,
Expand Down Expand Up @@ -854,7 +857,7 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
}
}
}
_, err := r.scaleUpAllVertices(ctx, pl)
_, err := r.updateVerticeDesiredPhase(ctx, pl, allVertexFilter, dfv1.VertexPhaseRunning)
if err != nil {
return false, err
}
Expand All @@ -874,7 +877,7 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
pl.Status.MarkPhasePausing()

if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
_, err := r.scaleDownSourceVertices(ctx, pl)
_, err := r.updateVerticeDesiredPhase(ctx, pl, sourceVertexFilter, dfv1.VertexPhasePaused)
if err != nil {
// If there's an error requeue the request
return true, err
Expand Down Expand Up @@ -915,7 +918,7 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin

// if drain is completed, or we have exceeded the pause deadline, mark pl as paused and scale down
if time.Now().After(pauseTimestamp.Add(time.Duration(pl.GetPauseGracePeriodSeconds())*time.Second)) || drainCompleted {
_, err = r.scaleDownAllVertices(ctx, pl)
_, err = r.updateVerticeDesiredPhase(ctx, pl, allVertexFilter, dfv1.VertexPhasePaused)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -945,49 +948,22 @@ func (r *pipelineReconciler) noSourceVertexPodsRunning(ctx context.Context, pl *
return len(pods.Items) == 0, nil
}

func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
return r.scaleVertex(ctx, pl, sourceVertexFilter, 0)
}

func (r *pipelineReconciler) scaleDownAllVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
return r.scaleVertex(ctx, pl, allVertexFilter, 0)
}

func (r *pipelineReconciler) scaleUpAllVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
return r.scaleVertex(ctx, pl, allVertexFilter, 1)
}

func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, filter vertexFilterFunc, replicas int32) (bool, error) {
func (r *pipelineReconciler) updateVerticeDesiredPhase(ctx context.Context, pl *dfv1.Pipeline, filter vertexFilterFunc, desiredPhase dfv1.VertexPhase) (bool, error) {
log := logging.FromContext(ctx)
existingVertices, err := r.findExistingVertices(ctx, pl)
if err != nil {
return false, err
}
isVertexPatched := false
for _, vertex := range existingVertices {
if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) {
scaleTo := replicas
// if replicas equals to 1, it means we are resuming a paused pipeline
// in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration:
// for a reducer, we scale up to the partition count
// for a non-reducer, if min is set, we scale up to min
if replicas == 1 {
if vertex.IsReduceUDF() {
scaleTo = int32(vertex.GetPartitionCount())
} else {
scaleTo = vertex.Spec.Scale.GetMinReplicas()
if scaleTo < 1 {
scaleTo = 1
}
}
}
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo)
if originPhase := vertex.Spec.Lifecycle.GetDesiredPhase(); filter(vertex) && originPhase != desiredPhase {
patchJson := fmt.Sprintf(`{"spec":{"lifecycle":{"desiredPhase":"%s"}}}`, desiredPhase)
err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson)))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", scaleTo), zap.String("vertex", vertex.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, scaleTo)
log.Infow("Updated vertex desired phase", zap.String("from", string(originPhase)), zap.String("to", string(desiredPhase)), zap.String("vertex", vertex.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexDesiredPhase", "Updated vertex %q desired phase from %s to %s", vertex.Name, originPhase, desiredPhase)
isVertexPatched = true
}
}
Expand All @@ -997,7 +973,7 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
func (r *pipelineReconciler) safeToDelete(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// update the phase to deleting
pl.Status.MarkPhaseDeleting()
vertexPatched, err := r.scaleDownSourceVertices(ctx, pl)
vertexPatched, err := r.updateVerticeDesiredPhase(ctx, pl, sourceVertexFilter, dfv1.VertexPhasePaused)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func Test_pauseAndResumePipeline(t *testing.T) {
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.Equal(t, dfv1.VertexPhasePaused, v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Lifecycle.GetDesiredPhase())
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
Expand Down
14 changes: 4 additions & 10 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (

vertex.Status.MarkDeployed()

// Mark it running before checking the status of the pods
vertex.Status.MarkPhaseRunning()
// Mark desired phase before checking the status of the pods
vertex.Status.MarkPhase(vertex.Spec.Lifecycle.GetDesiredPhase(), "", "")

// Check status of the pods
var podList corev1.PodList
Expand All @@ -177,10 +177,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
return ctrl.Result{}, fmt.Errorf("failed to get pods of a vertex: %w", err)
}
readyPods := reconciler.NumOfReadyPods(podList)
desiredReplicas := vertex.GetReplicas()
if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused {
desiredReplicas = 0
}
desiredReplicas := vertex.CalculateReplicas()
if readyPods > desiredReplicas { // It might happen in some corner cases, such as during rollout
readyPods = desiredReplicas
}
Expand All @@ -197,10 +194,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (

func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService) error {
log := logging.FromContext(ctx)
desiredReplicas := vertex.GetReplicas()
if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused {
desiredReplicas = 0
}
desiredReplicas := vertex.CalculateReplicas()
vertex.Status.DesiredReplicas = uint32(desiredReplicas)

// Set metrics
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
if vertex.Spec.Lifecycle.GetDesiredPhase() != dfv1.VertexPhaseRunning {
log.Infof("Vertex desired phase is not Running, skip scaling.")
}
if vertex.Status.UpdateHash != vertex.Status.CurrentHash && vertex.Status.UpdateHash != "" {
log.Info("Vertex is updating, skip scaling.")
return nil
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp
if err != nil {
return fmt.Errorf("error getting vertex pod name: %w", err)
}
ok = ok && len(podList.Items) > 0 && len(podList.Items) == vertexList.Items[0].GetReplicas() // pod number should equal to desired replicas
ok = ok && len(podList.Items) > 0 && len(podList.Items) == vertexList.Items[0].CalculateReplicas() // pod number should equal to desired replicas
for _, p := range podList.Items {
ok = ok && isPodReady(p)
}
Expand Down

0 comments on commit ff307b4

Please sign in to comment.