@@ -26,7 +26,9 @@ import (
2626 "time"
2727
2828 "github.com/spf13/pflag"
29-
29+ "k8s.io/apimachinery/pkg/runtime"
30+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31+ cqv1alpha1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacityquota/autoscaling.x-k8s.io/v1alpha1"
3032 capacityclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client"
3133 "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
3234 "k8s.io/autoscaler/cluster-autoscaler/config/flags"
@@ -41,7 +43,12 @@ import (
4143 "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
4244 "k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
4345 "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
46+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
4447 "k8s.io/kubernetes/pkg/features"
48+ ctrl "sigs.k8s.io/controller-runtime"
49+ "sigs.k8s.io/controller-runtime/pkg/cache"
50+ "sigs.k8s.io/controller-runtime/pkg/manager"
51+ metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4552
4653 "k8s.io/apimachinery/pkg/api/meta"
4754 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -85,6 +92,16 @@ import (
8592 "k8s.io/klog/v2"
8693)
8794
95+ var (
96+ scheme = runtime .NewScheme ()
97+ )
98+
99+ func init () {
100+ utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
101+ utilruntime .Must (cqv1alpha1 .AddToScheme (scheme ))
102+ // TODO: add other CRDs
103+ }
104+
88105func registerSignalHandlers (autoscaler core.Autoscaler ) {
89106 sigs := make (chan os.Signal , 1 )
90107 signal .Notify (sigs , os .Interrupt , os .Kill , syscall .SIGTERM , syscall .SIGQUIT )
@@ -100,7 +117,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
100117 }()
101118}
102119
103- func buildAutoscaler (ctx context.Context , debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter ) (core.Autoscaler , * loop.LoopTrigger , error ) {
120+ func buildAutoscaler (ctx context.Context , debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter , mgr manager. Manager ) (core.Autoscaler , * loop.LoopTrigger , error ) {
104121 // Get AutoscalingOptions from flags.
105122 autoscalingOptions := flags .AutoscalingOptions ()
106123
@@ -133,6 +150,8 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot
133150 DeleteOptions : deleteOptions ,
134151 DrainabilityRules : drainabilityRules ,
135152 ScaleUpOrchestrator : orchestrator .New (),
153+ KubeClientNew : mgr .GetClient (),
154+ KubeCache : mgr .GetCache (),
136155 }
137156
138157 opts .Processors = ca_processors .DefaultProcessors (autoscalingOptions )
@@ -304,12 +323,29 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
304323 ctx , cancel := context .WithCancel (context .Background ())
305324 defer cancel ()
306325
307- autoscaler , trigger , err := buildAutoscaler (ctx , debuggingSnapshotter )
326+ restConfig := kube_util .GetKubeConfig (autoscalingOpts .KubeClientOpts )
327+ mgr , err := ctrl .NewManager (restConfig , ctrl.Options {
328+ Scheme : scheme ,
329+ Cache : cache.Options {
330+ DefaultTransform : cache .TransformStripManagedFields (),
331+ },
332+ // TODO: migrate leader election, metrics, healthcheck, pprof servers to Manager
333+ LeaderElection : false ,
334+ Metrics : metricsserver.Options {BindAddress : "0" },
335+ HealthProbeBindAddress : "0" ,
336+ PprofBindAddress : "0" ,
337+ })
338+ if err != nil {
339+ klog .Fatalf ("Failed to create manager: %v" , err )
340+ }
341+
342+ autoscaler , trigger , err := buildAutoscaler (ctx , debuggingSnapshotter , mgr )
308343 if err != nil {
309344 klog .Fatalf ("Failed to create autoscaler: %v" , err )
310345 }
311346
312347 // Register signal handlers for graceful shutdown.
348+ // TODO: replace with ctrl.SetupSignalHandlers() and handle graceful shutdown with context
313349 registerSignalHandlers (autoscaler )
314350
315351 // Start updating health check endpoint.
@@ -320,22 +356,42 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
320356 klog .Fatalf ("Failed to autoscaler background components: %v" , err )
321357 }
322358
323- // Autoscale ad infinitum.
324- if autoscalingOpts .FrequentLoopsEnabled {
325- // We need to have two timestamps because the scaleUp activity alternates between processing ProvisioningRequests,
326- // so we need to pass the older timestamp (previousRun) to trigger.Wait to run immediately if only one of the activities is productive.
327- lastRun := time .Now ()
328- previousRun := time .Now ()
329- for {
330- trigger .Wait (previousRun )
331- previousRun , lastRun = lastRun , time .Now ()
332- loop .RunAutoscalerOnce (autoscaler , healthCheck , lastRun )
333- }
334- } else {
335- for {
336- time .Sleep (autoscalingOpts .ScanInterval )
337- loop .RunAutoscalerOnce (autoscaler , healthCheck , time .Now ())
359+ err = mgr .Add (manager .RunnableFunc (func (ctx context.Context ) error {
360+ // Autoscale ad infinitum.
361+ if autoscalingOpts .FrequentLoopsEnabled {
362+ // We need to have two timestamps because the scaleUp activity alternates between processing ProvisioningRequests,
363+ // so we need to pass the older timestamp (previousRun) to trigger.Wait to run immediately if only one of the activities is productive.
364+ lastRun := time .Now ()
365+ previousRun := time .Now ()
366+ for {
367+ select {
368+ case <- ctx .Done ():
369+ // TODO: handle graceful shutdown with context
370+ return nil
371+ default :
372+ trigger .Wait (previousRun )
373+ previousRun , lastRun = lastRun , time .Now ()
374+ loop .RunAutoscalerOnce (autoscaler , healthCheck , lastRun )
375+ }
376+ }
377+ } else {
378+ for {
379+ select {
380+ case <- ctx .Done ():
381+ // TODO: handle graceful shutdown with context
382+ return nil
383+ case <- time .After (autoscalingOpts .ScanInterval ):
384+ loop .RunAutoscalerOnce (autoscaler , healthCheck , time .Now ())
385+ }
386+ }
338387 }
388+ }))
389+ if err != nil {
390+ klog .Fatalf ("Failed to add runnable to manager: %v" , err )
391+ }
392+
393+ if err := mgr .Start (ctx ); err != nil {
394+ klog .Fatalf ("Manager exited with error: %v" , err )
339395 }
340396}
341397
@@ -373,6 +429,7 @@ func main() {
373429 if err := logsapi .ValidateAndApply (loggingConfig , featureGate ); err != nil {
374430 klog .Fatalf ("Failed to validate and apply logging configuration: %v" , err )
375431 }
432+ ctrl .SetLogger (klog .NewKlogr ())
376433
377434 healthCheck := metrics .NewHealthCheck (autoscalingOpts .MaxInactivityTime , autoscalingOpts .MaxFailingTime )
378435
0 commit comments