Skip to content

Commit

Permalink
Merge branch 'main' into async-serve
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Feb 21, 2025
2 parents 0809e7c + ef0cffd commit abd7cdd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
return nil
}

if monoVtx.Spec.Scale.GetMaxReplicas() == monoVtx.Spec.Scale.GetMinReplicas() {
log.Infof("MonoVertex %s has same scale.min and scale.max, skip scaling.", monoVtx.Name)
return nil
}

if monoVtx.Status.Replicas == 0 { // Was scaled to 0
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
// Keep the original replicas as much as possible
if originalReplicas >= newObj.Spec.Scale.GetMinReplicas() && originalReplicas <= newObj.Spec.Scale.GetMaxReplicas() {
oldObj.Spec.Replicas = &originalReplicas
} else if originalReplicas < newObj.Spec.Scale.GetMinReplicas() {
originalReplicas = newObj.Spec.Scale.GetMinReplicas()
} else {
originalReplicas = newObj.Spec.Scale.GetMaxReplicas()
}
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
Expand Down Expand Up @@ -647,11 +651,11 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
replicas = int32(partitions)
} else {
x := vCopy.Scale
if x.Min != nil && *x.Min > 1 && replicas < *x.Min {
replicas = *x.Min
if replicas < x.GetMinReplicas() {
replicas = x.GetMinReplicas()
}
if x.Max != nil && *x.Max > 1 && replicas > *x.Max {
replicas = *x.Max
if replicas > x.GetMaxReplicas() {
replicas = x.GetMaxReplicas()
}
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
vertex.Status.InitConditions()
vertex.Status.SetObservedGeneration(vertex.Generation)

desiredReplicas := vertex.GetReplicas()

isbSvc := &dfv1.InterStepBufferService{}
isbSvcName := dfv1.DefaultISBSvcName
if len(vertex.Spec.InterStepBufferServiceName) > 0 {
Expand Down Expand Up @@ -179,6 +177,10 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
return ctrl.Result{}, fmt.Errorf("failed to get pods of a vertex: %w", err)
}
readyPods := reconciler.NumOfReadyPods(podList)
desiredReplicas := vertex.GetReplicas()
if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused {
desiredReplicas = 0
}
if readyPods > desiredReplicas { // It might happen in some corner cases, such as during rollout
readyPods = desiredReplicas
}
Expand All @@ -196,6 +198,9 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService) error {
log := logging.FromContext(ctx)
desiredReplicas := vertex.GetReplicas()
if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused {
desiredReplicas = 0
}
vertex.Status.DesiredReplicas = uint32(desiredReplicas)

// Set metrics
Expand Down
7 changes: 6 additions & 1 deletion pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Info("Corresponding Pipeline not in Running state, skip scaling.")
return nil
}
if int(vertex.Status.Replicas) != vertex.GetReplicas() {
if vertex.Status.Replicas != vertex.Status.DesiredReplicas {
log.Infof("Vertex %s might be under processing, replicas mismatch, skip scaling.", vertex.Name)
return nil
}

if vertex.Spec.Scale.GetMaxReplicas() == vertex.Spec.Scale.GetMinReplicas() {
log.Infof("Vertex %s has same scale.min and scale.max, skip scaling.", vertex.Name)
return nil
}

var err error
daemonClient, _ := s.daemonClientsCache.Get(pl.GetDaemonServiceURL())
if daemonClient == nil {
Expand Down

0 comments on commit abd7cdd

Please sign in to comment.