Skip to content

Commit 9db3d25

Browse files
authored
Merge pull request #8835 from norbertcyran/resource-quotas-scale-up
Integrate resource quotas with scale up
2 parents 3139a28 + f8d09a4 commit 9db3d25

27 files changed

+694
-176
lines changed

cluster-autoscaler/core/autoscaler.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import (
2424
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2525
coreoptions "k8s.io/autoscaler/cluster-autoscaler/core/options"
2626
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
27+
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
2728
"k8s.io/autoscaler/cluster-autoscaler/estimator"
2829
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
2930
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
3031
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
32+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3133
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3234
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
3335
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@@ -76,6 +78,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
7678
opts.DeleteOptions,
7779
opts.DrainabilityRules,
7880
opts.DraProvider,
81+
opts.QuotasTrackerOptions,
7982
), nil
8083
}
8184

@@ -142,6 +145,17 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
142145
}
143146
opts.ExpanderStrategy = expanderStrategy
144147
}
148+
if opts.QuotasTrackerOptions.QuotaProvider == nil {
149+
cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider)
150+
opts.QuotasTrackerOptions.QuotaProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider})
151+
}
152+
if opts.QuotasTrackerOptions.CustomResourcesProcessor == nil {
153+
opts.QuotasTrackerOptions.CustomResourcesProcessor = opts.Processors.CustomResourcesProcessor
154+
}
155+
if opts.QuotasTrackerOptions.NodeFilter == nil {
156+
virtualKubeletNodeFilter := utils.VirtualKubeletNodeFilter{}
157+
opts.QuotasTrackerOptions.NodeFilter = resourcequotas.NewCombinedNodeFilter([]resourcequotas.NodeFilter{virtualKubeletNodeFilter})
158+
}
145159

146160
return nil
147161
}

cluster-autoscaler/core/options/autoscaler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/expander"
2828
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
2929
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
30+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3031
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
3233
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
@@ -57,4 +58,5 @@ type AutoscalerOptions struct {
5758
DeleteOptions options.NodeDeleteOptions
5859
DrainabilityRules rules.Rules
5960
DraProvider *draprovider.Provider
61+
QuotasTrackerOptions resourcequotas.TrackerOptions
6062
}

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
appsv1 "k8s.io/api/apps/v1"
2424
apiv1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/util/sets"
2526
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2627
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
2728
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
@@ -34,6 +35,7 @@ import (
3435
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
3536
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
3637
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
38+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3739
"k8s.io/autoscaler/cluster-autoscaler/simulator"
3840
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3941
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@@ -47,6 +49,7 @@ type ScaleUpOrchestrator struct {
4749
autoscalingCtx *ca_context.AutoscalingContext
4850
processors *ca_processors.AutoscalingProcessors
4951
resourceManager *resource.Manager
52+
quotasTrackerFactory *resourcequotas.TrackerFactory
5053
clusterStateRegistry *clusterstate.ClusterStateRegistry
5154
scaleUpExecutor *scaleUpExecutor
5255
estimatorBuilder estimator.EstimatorBuilder
@@ -68,6 +71,7 @@ func (o *ScaleUpOrchestrator) Initialize(
6871
clusterStateRegistry *clusterstate.ClusterStateRegistry,
6972
estimatorBuilder estimator.EstimatorBuilder,
7073
taintConfig taints.TaintConfig,
74+
quotasTrackerFactory *resourcequotas.TrackerFactory,
7175
) {
7276
o.autoscalingCtx = autoscalingCtx
7377
o.processors = processors
@@ -76,6 +80,7 @@ func (o *ScaleUpOrchestrator) Initialize(
7680
o.taintConfig = taintConfig
7781
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
7882
o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker)
83+
o.quotasTrackerFactory = quotasTrackerFactory
7984
o.initialized = true
8085
}
8186

@@ -121,15 +126,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
121126
// Initialise binpacking limiter.
122127
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups)
123128

124-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
125-
if aErr != nil {
126-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
129+
tracker, err := o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
130+
if err != nil {
131+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
127132
}
128133

129134
now := time.Now()
130135

131136
// Filter out invalid node groups
132-
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now)
137+
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes), now)
133138

134139
// Mark skipped node groups as processed.
135140
for nodegroupID := range skippedNodeGroups {
@@ -145,7 +150,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
145150
}
146151

147152
for _, nodeGroup := range validNodeGroups {
148-
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing)
153+
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes), now, allOrNothing)
149154
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingCtx, nodeGroup.Id())
150155

151156
if len(option.Pods) == 0 || option.NodeCount == 0 {
@@ -189,20 +194,20 @@ func (o *ScaleUpOrchestrator) ScaleUp(
189194
klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())
190195

191196
// Cap new nodes to supported number of nodes in the cluster.
192-
newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes))
197+
newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes))
193198
if aErr != nil {
194199
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
195200
}
196201

197-
newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos)
202+
newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos)
198203
if aErr != nil {
199204
return status.UpdateScaleUpError(
200205
&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
201206
aErr)
202207
}
203208

204209
if newNodes < bestOption.NodeCount {
205-
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
210+
klog.V(1).Infof("Only %d nodes can be added to %s due to resource quotas", newNodes, bestOption.NodeGroup.Id())
206211
if allOrNothing {
207212
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
208213
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
@@ -283,14 +288,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
283288
}, nil
284289
}
285290

286-
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
291+
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
287292
nodeInfo, found := nodeInfos[nodeGroup.Id()]
288293
if !found {
289294
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
290295
klog.Errorf("No node info for: %s", nodeGroup.Id())
291296
return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!")
292297
}
293-
return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup)
298+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes)
299+
if err != nil {
300+
return 0, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("failed to check resource quotas: ")
301+
}
302+
return checkResult.AllowedDelta, nil
294303
}
295304

296305
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
@@ -309,9 +318,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
309318
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
310319
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)
311320

312-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
313-
if aErr != nil {
314-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
321+
tracker, err := o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
322+
if err != nil {
323+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
315324
}
316325

317326
for _, ng := range nodeGroups {
@@ -342,17 +351,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
342351
continue
343352
}
344353

345-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil {
354+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil {
346355
klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
347356
continue
348357
}
349358

350359
newNodeCount := ng.MinSize() - targetSize
351-
newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng)
360+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount)
352361
if err != nil {
353-
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
362+
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to check resource quotas: %v", err)
354363
continue
355364
}
365+
newNodeCount = checkResult.AllowedDelta
356366

357367
newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize)
358368
if err != nil {
@@ -397,7 +407,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
397407
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
398408
nodeGroups []cloudprovider.NodeGroup,
399409
nodeInfos map[string]*framework.NodeInfo,
400-
resourcesLeft resource.Limits,
410+
tracker *resourcequotas.Tracker,
401411
currentNodeCount int,
402412
now time.Time,
403413
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
@@ -441,7 +451,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
441451
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
442452
continue
443453
}
444-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil {
454+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil {
445455
skippedNodeGroups[nodeGroup.Id()] = skipReason
446456
continue
447457
}
@@ -664,31 +674,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.
664674
}
665675

666676
// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
667-
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
668-
resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup)
677+
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
678+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes)
669679
if err != nil {
670-
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
680+
klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err)
671681
return NotReadyReason
672682
}
673683

674-
for resource, delta := range resourcesDelta {
675-
resourcesDelta[resource] = delta * int64(numNodes)
676-
}
677-
678-
checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
679-
if checkResult.Exceeded {
680-
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
681-
for _, resource := range checkResult.ExceededResources {
682-
switch resource {
683-
case cloudprovider.ResourceNameCores:
684-
metrics.RegisterSkippedScaleUpCPU()
685-
case cloudprovider.ResourceNameMemory:
686-
metrics.RegisterSkippedScaleUpMemory()
687-
default:
688-
continue
684+
if checkResult.Exceeded() {
685+
resources := make(sets.Set[string])
686+
for _, quota := range checkResult.ExceededQuotas {
687+
klog.V(4).Infof(
688+
"Skipping node group %s; %q quota exceeded, resources: %v", nodeGroup.Id(), quota.ID, quota.ExceededResources,
689+
)
690+
for _, resource := range quota.ExceededResources {
691+
if resources.Has(resource) {
692+
continue
693+
}
694+
resources.Insert(resource)
695+
switch resource {
696+
case cloudprovider.ResourceNameCores:
697+
metrics.RegisterSkippedScaleUpCPU()
698+
case cloudprovider.ResourceNameMemory:
699+
metrics.RegisterSkippedScaleUpMemory()
700+
default:
701+
continue
702+
}
689703
}
690704
}
691-
return NewMaxResourceLimitReached(checkResult.ExceededResources)
705+
return NewMaxResourceLimitReached(checkResult.ExceededQuotas)
692706
}
693707
return nil
694708
}

0 commit comments

Comments
 (0)