Skip to content

cluster-autoscaler: standardize context usage #7664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ package builder
import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/client-go/informers"

klog "k8s.io/klog/v2"
@@ -34,7 +34,7 @@ func NewCloudProvider(opts config.AutoscalingOptions, informerFactory informers.
NodeGroupAutoDiscoverySpecs: opts.NodeGroupAutoDiscovery,
}

rl := context.NewResourceLimiterFromAutoscalingOptions(opts)
rl := ca_context.NewResourceLimiterFromAutoscalingOptions(opts)

if opts.CloudProviderName == "" {
// Ideally this would be an error, but several unit tests of the
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
@@ -50,7 +50,7 @@ type AutoscalerOptions struct {
config.AutoscalingOptions
KubeClient kube_client.Interface
InformerFactory informers.SharedInformerFactory
AutoscalingKubeClients *context.AutoscalingKubeClients
AutoscalingKubeClients *ca_context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
FrameworkHandle *framework.Handle
ClusterSnapshot clustersnapshot.ClusterSnapshot
@@ -117,7 +117,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
opts.LoopStartNotifier = loopstart.NewObserversList(nil)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
)

@@ -31,7 +31,7 @@ func NewClearTPURequestsPodListProcessor() *clearTpuRequests {
}

// Process removes pods' tpu requests
func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
func (p *clearTpuRequests) Process(autoscalingContext *ca_context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return tpu.ClearTPURequests(pods), nil
}

Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
"k8s.io/klog/v2"
)
@@ -33,19 +33,19 @@ func NewCurrentlyDrainedNodesPodListProcessor() *currentlyDrainedNodesPodListPro
}

// Process adds recreatable pods from currently drained nodes
func (p *currentlyDrainedNodesPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
recreatablePods := pod_util.FilterRecreatablePods(currentlyDrainedPods(context))
func (p *currentlyDrainedNodesPodListProcessor) Process(autoscalingContext *ca_context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
recreatablePods := pod_util.FilterRecreatablePods(currentlyDrainedPods(autoscalingContext))
return append(unschedulablePods, pod_util.ClearPodNodeNames(recreatablePods)...), nil
}

func (p *currentlyDrainedNodesPodListProcessor) CleanUp() {
}

func currentlyDrainedPods(context *context.AutoscalingContext) []*apiv1.Pod {
func currentlyDrainedPods(autoscalingContext *ca_context.AutoscalingContext) []*apiv1.Pod {
var pods []*apiv1.Pod
_, nodeNames := context.ScaleDownActuator.CheckStatus().DeletionsInProgress()
_, nodeNames := autoscalingContext.ScaleDownActuator.CheckStatus().DeletionsInProgress()
for _, nodeName := range nodeNames {
nodeInfo, err := context.ClusterSnapshot.GetNodeInfo(nodeName)
nodeInfo, err := autoscalingContext.ClusterSnapshot.GetNodeInfo(nodeName)
if err != nil {
klog.Warningf("Couldn't get node %v info, assuming the node got deleted already: %v", nodeName, err)
continue
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
@@ -267,14 +267,14 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.AutoscalingContext{
autoscalingContext := ca_context.AutoscalingContext{
ScaleDownActuator: &mockActuator{&mockActuationStatus{tc.drainedNodes}},
ClusterSnapshot: testsnapshot.NewTestSnapshotOrDie(t),
}
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, tc.nodes, tc.pods)
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, tc.nodes, tc.pods)

processor := NewCurrentlyDrainedNodesPodListProcessor()
pods, err := processor.Process(&ctx, tc.unschedulablePods)
pods, err := processor.Process(&autoscalingContext, tc.unschedulablePods)
assert.NoError(t, err)
assert.ElementsMatch(t, tc.wantPods, pods)
})
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
podutils "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
klog "k8s.io/klog/v2"
)
@@ -32,7 +32,7 @@ func NewFilterOutDaemonSetPodListProcessor() *filterOutDaemonSetPodListProcessor
}

// Process filters out pods which are daemon set pods.
func (p *filterOutDaemonSetPodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
func (p *filterOutDaemonSetPodListProcessor) Process(autoscalingContext *ca_context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
// Scale-up cannot help unschedulable Daemon Set pods, as those require a specific node
// for scheduling. To improve that we are filtering them here, as the CA won't be
// able to help them so there is no point to in passing them to scale-up logic.
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import (
"fmt"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
@@ -35,15 +35,15 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {
}

// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
nodes, err := context.AllNodeLister().List()
func (p *filterOutExpendable) Process(autoscalingContext *ca_context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
nodes, err := autoscalingContext.AllNodeLister().List()
if err != nil {
return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err)
}
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff
expendablePodsPriorityCutoff := autoscalingContext.AutoscalingOptions.ExpendablePodsPriorityCutoff

unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff)
if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, autoscalingContext); err != nil {
klog.Warningf("Failed to add preempting pods to snapshot: %v", err)
return nil, err
}
@@ -54,10 +54,10 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, autoscalingContext *ca_context.AutoscalingContext) error {
for _, p := range pods {
// TODO(DRA): Figure out if/how to use the predicate-checking SchedulePod() here instead - otherwise this doesn't work with DRA pods.
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
if err := autoscalingContext.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -114,12 +114,12 @@ func TestFilterOutExpendable(t *testing.T) {
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
pods, err := processor.Process(&ca_context.AutoscalingContext{
ClusterSnapshot: snapshot,
AutoscalingOptions: config.AutoscalingOptions{
ExpendablePodsPriorityCutoff: tc.priorityCutoff,
},
AutoscalingKubeClients: context.AutoscalingKubeClients{
AutoscalingKubeClients: ca_context.AutoscalingKubeClients{
ListerRegistry: newMockListerRegistry(tc.nodes),
},
}, tc.pods)
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
@@ -45,7 +45,7 @@ func NewFilterOutSchedulablePodListProcessor(nodeFilter func(*framework.NodeInfo
}

// Process filters out pods which are schedulable from list of unschedulable pods.
func (p *filterOutSchedulablePodListProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
func (p *filterOutSchedulablePodListProcessor) Process(autoscalingContext *ca_context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
// We need to check whether pods marked as unschedulable are actually unschedulable.
// It's likely we added a new node and the scheduler just haven't managed to put the
// pod on in yet. In this situation we don't want to trigger another scale-up.
@@ -65,7 +65,7 @@ func (p *filterOutSchedulablePodListProcessor) Process(context *context.Autoscal
klog.V(4).Infof("Filtering out schedulables")
filterOutSchedulableStart := time.Now()

unschedulablePodsToHelp, err := p.filterOutSchedulableByPacking(unschedulablePods, context.ClusterSnapshot)
unschedulablePodsToHelp, err := p.filterOutSchedulableByPacking(unschedulablePods, autoscalingContext.ClusterSnapshot)

if err != nil {
return nil, err
@@ -76,9 +76,9 @@ func (p *filterOutSchedulablePodListProcessor) Process(context *context.Autoscal
if len(unschedulablePodsToHelp) != len(unschedulablePods) {
klog.V(2).Info("Schedulable pods present")

if context.DebuggingSnapshotter.IsDataCollectionAllowed() {
if autoscalingContext.DebuggingSnapshotter.IsDataCollectionAllowed() {
schedulablePods := findSchedulablePods(unschedulablePods, unschedulablePodsToHelp)
context.DebuggingSnapshotter.SetUnscheduledPodsCanBeScheduled(schedulablePods)
autoscalingContext.DebuggingSnapshotter.SetUnscheduledPodsCanBeScheduled(schedulablePods)
}

} else {
Loading