Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 184 additions & 1 deletion internal/controller/cpuscalingprofile_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package controller
import (
"context"
"fmt"
"reflect"
"slices"
"strings"

powerv1 "github.com/intel/kubernetes-power-manager/api/v1"

Expand All @@ -27,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
Expand Down Expand Up @@ -68,6 +72,34 @@ func (r *CPUScalingProfileReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

logger.V(5).Info("cpuscalingprofile profile not found")

// Owned PowerProfile will be automatically deleted by K8s GC

// Search for CPUScalingProfile in CPUScalingConfigurations to delete associated entries
cpuScalingConfList := &powerv1.CPUScalingConfigurationList{}
err = r.Client.List(context.TODO(), cpuScalingConfList)
logger.V(5).Info("retrieving the cpuscalingconfiguration list")
if err != nil {
logger.Error(err, "error retrieving cpuscalingconfiguration list")
return ctrl.Result{}, err
}

// Set Name and Namespace so helper func controllerutil.RemoveOwnerReference can be used
scalingProfile = &powerv1.CPUScalingProfile{
ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
},
}
for _, cpuScalingConf := range cpuScalingConfList.Items {
if err = r.updateOrDeleteCPUScalingConfiguration(
&cpuScalingConf, []powerv1.ConfigItem{}, scalingProfile, logger); err != nil {
logger.Error(err, "error while cleaning after deleted cpuscalingprofile")
return ctrl.Result{}, err
}
}
logger.V(5).Info("succesfully cleaned after deleted cpuscalingprofile")

return ctrl.Result{}, nil
}

Expand All @@ -84,7 +116,58 @@ func (r *CPUScalingProfileReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, err
}

// TODO: CPUScalingConfiguration related code
// Search for CPUs associated with CPUScalingProfile in PowerWorkloads
powerWorkloadList := &powerv1.PowerWorkloadList{}
err = r.Client.List(context.TODO(), powerWorkloadList)
logger.V(5).Info("retrieving the powerworkload list")
if err != nil {
logger.Error(err, "error retrieving powerworkload list")
return ctrl.Result{}, err
}
nodesItems := r.createConfigItems(powerWorkloadList, scalingProfile)

// Create, delete or update CPUScalingConfiguration on nodes on which CPUScalingProfile is requested
for nodeName, items := range nodesItems {
confKey := client.ObjectKey{Name: nodeName, Namespace: IntelPowerNamespace}
cpuScalingConf := &powerv1.CPUScalingConfiguration{}
err := r.Client.Get(context.TODO(), confKey, cpuScalingConf)
// CPUScalingConfiguration could not be retrieved
if err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "error retrieving cpuscalingconfiguration")
return ctrl.Result{}, err
}

cpuScalingConf = &powerv1.CPUScalingConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Namespace: IntelPowerNamespace,
},
}
if err = r.setCPUScalingConfiguration(cpuScalingConf, items, scalingProfile); err != nil {
return ctrl.Result{}, err
}

if len(cpuScalingConf.Spec.Items) == 0 {
logger.V(5).Info("not creating cpuscalingconfiguration, spec would be empty")
continue
}

if err := r.Client.Create(context.TODO(), cpuScalingConf); err != nil {
err = fmt.Errorf("error creating cpuscalingconfiguration: %w", err)
logger.Error(err, "")
return ctrl.Result{}, err
}

logger.V(5).Info("cpuscalingconfiguration successfully created")
continue
}

if err = r.updateOrDeleteCPUScalingConfiguration(cpuScalingConf, items, scalingProfile, logger); err != nil {
logger.Error(err, "error while reconciling after cluster state change")
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -159,10 +242,110 @@ func (r *CPUScalingProfileReconciler) verifyCPUScalingProfileParams(scalingSpec
return nil
}

// updateOrDeleteCPUScalingConfiguration sets existing CPUScalingConfiguration with newItems and then decide if
// CPUScalingConfiguration should be updated or deleted in apiserver.
func (r *CPUScalingProfileReconciler) updateOrDeleteCPUScalingConfiguration(
scalingConfig *powerv1.CPUScalingConfiguration, newItems []powerv1.ConfigItem, scalingProfile metav1.Object,
logger logr.Logger) error {
logger = logger.WithValues("cpuscalingconfiguration", scalingConfig.Name)

origScalingConfig := scalingConfig.DeepCopy()
if err := r.setCPUScalingConfiguration(scalingConfig, newItems, scalingProfile); err != nil {
err = fmt.Errorf("error modifying cpuscalingconfiguration object: %w", err)
logger.Error(err, "")
return err
}

if len(scalingConfig.Spec.Items) == 0 {
if err := r.Client.Delete(context.TODO(), scalingConfig); err != nil {
err = fmt.Errorf("error deleting cpuscalingconfiguration from the cluster: %w", err)
logger.Error(err, "")
return err
}
logger.V(5).Info(
"succesfully deleted empty cpuscalingconfiguration",
)
return nil
}

// Guard to not call unnecessary Update
if reflect.DeepEqual(origScalingConfig, scalingConfig) {
logger.V(5).Info("cpuscalingconfiguration is already in desired state")
return nil
}

if err := r.Client.Update(context.TODO(), scalingConfig); err != nil {
err = fmt.Errorf("error updating cpuscalingconfiguration: %w", err)
logger.Error(err, "")
return err
}
logger.V(5).Info("cpuscalingconfiguration successfully updated")

return nil
}

// createConfigItems creates slices of powerv1.ConfigItem mapped to node names. If no Containers are requesting
// passed CPUScalingProfile on iterated node, empty powerv1.ConfigItem slice is mapped to node name.
func (r *CPUScalingProfileReconciler) createConfigItems(powerWorkloadList *powerv1.PowerWorkloadList,
scalingProfile *powerv1.CPUScalingProfile) map[string][]powerv1.ConfigItem {
nodesItems := make(map[string][]powerv1.ConfigItem)
for _, powerWorkload := range powerWorkloadList.Items {
if powerWorkload.Spec.PowerProfile == scalingProfile.Name && !powerWorkload.Spec.AllCores {
// ConfigItem is per Container
for _, container := range powerWorkload.Spec.Node.Containers {
nodesItems[powerWorkload.Spec.Node.Name] = append(nodesItems[powerWorkload.Spec.Node.Name],
powerv1.ConfigItem{
PowerProfile: scalingProfile.Name,
CpuIDs: container.ExclusiveCPUs,
SamplePeriod: scalingProfile.Spec.SamplePeriod,
PodUID: container.PodUID,
},
)
}
// No containers are using PowerProfile owned by this CPUScalingProfile
if _, found := nodesItems[powerWorkload.Spec.Node.Name]; !found {
nodesItems[powerWorkload.Spec.Node.Name] = []powerv1.ConfigItem{}
}
}
}

return nodesItems
}

// setCPUScalingConfiguration sets .Spec.Items to newItems and refreshes Ownership by CPUScalingProfile accordingly.
func (r *CPUScalingProfileReconciler) setCPUScalingConfiguration(scalingConfig *powerv1.CPUScalingConfiguration,
newItems []powerv1.ConfigItem, scalingProfile metav1.Object) error {
// Handle CPUScalingConfiguration.Spec.Items
scalingConfig.Spec.Items = slices.DeleteFunc(scalingConfig.Spec.Items,
func(item powerv1.ConfigItem) bool {
return item.PowerProfile == scalingProfile.GetName()
},
)
scalingConfig.Spec.Items = append(scalingConfig.Spec.Items, newItems...)
// Sort is crucial to update decision, we don't want to update resource in apiserver just because order has changed
slices.SortFunc(scalingConfig.Spec.Items, func(a, b powerv1.ConfigItem) int {
return strings.Compare(string(a.PodUID), string(b.PodUID))
})

// Handle Ownership
if len(newItems) == 0 {
if err := controllerutil.RemoveOwnerReference(scalingProfile, scalingConfig, r.Scheme); err != nil {
return fmt.Errorf("error deleting ownership of updated cpuscalingconfiguration: %w", err)
}
} else {
if err := controllerutil.SetOwnerReference(scalingProfile, scalingConfig, r.Scheme); err != nil {
return fmt.Errorf("error setting up ownership of updated cpuscalingconfiguration: %w", err)
}
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CPUScalingProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&powerv1.CPUScalingProfile{}).
Owns(&powerv1.PowerProfile{}).
Owns(&powerv1.CPUScalingConfiguration{}, builder.MatchEveryOwner).
Complete(r)
}
Loading
Loading