Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions api/workloads/v1alpha1/rolebasedgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type RoleBasedGroupSpec struct {

// Configuration for the PodGroup to enable gang-scheduling via supported plugins.
PodGroupPolicy *PodGroupPolicy `json:"podGroupPolicy,omitempty"`

// Coordination defines how roles should be coordinated
// +optional
Coordination []Coordination `json:"coordination,omitempty"`
}

// PodGroupPolicy represents a PodGroup configuration for gang-scheduling.
Expand Down Expand Up @@ -82,6 +86,33 @@ type VolcanoSchedulingPodGroupPolicySource struct {
Queue string `json:"queue,omitempty"`
}

type Coordination struct {
// Strategy defines the strategy for how roles should be coordinated.
// +optional
Strategy []RoleStrategy `json:"strategy,omitempty"`
}

type RoleStrategy struct {
// Role is the name of the role (e.g. "prefill", "decode", "router").
Role string `json:"role"`

// UpdateStrategy describes how this role should be updated.
UpdateStrategy *RoleUpdateStrategy `json:"updateStrategy,omitempty"`

// TODO: add more strategy here as needed. e.g. ScalingStrategy
}

// RoleUpdateStrategy describes how to update a role's workload.
type RoleUpdateStrategy struct {
// Type is the update strategy type (e.g. "Recreate", "InplaceIfPossible").
// +kubebuilder:validation:Enum={Recreate, InplaceIfPossible}
Type string `json:"type,omitempty"`
// Partition indicates the ordinal at which the role should be partitioned for updates.
Partition *int32 `json:"partition,omitempty"`
// BatchSize defines the maximum number of replicas that can be updated at once.
BatchSize intstr.IntOrString `json:"batchSize,omitempty"`
}

// RolloutStrategy defines the strategy that the rbg controller
// will use to perform replica updates of role.
type RolloutStrategy struct {
Expand Down Expand Up @@ -262,6 +293,19 @@ type RoleBasedGroupStatus struct {

// Status of individual roles
RoleStatuses []RoleStatus `json:"roleStatuses"`

// CoordinationState Status of coordination
CoordinationState []CoordinationState `json:"coordinationState,omitempty"`
}

type CoordinationState struct {
RoleState map[string]RoleCoordinationState `json:"progress,omitempty"`
LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
}

type RoleCoordinationState struct {
Strategy string `json:"strategy"`
State string `json:"state"`
}

// RoleStatus shows the current state of a specific role
Expand Down
172 changes: 131 additions & 41 deletions internal/controller/workloads/rolebasedgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -120,11 +121,15 @@
logger.Info("Current revision need to be updated")
if err := r.client.Create(ctx, expectedRevision); err != nil {
logger.Error(err, fmt.Sprintf("Failed to create revision %v", expectedRevision))
r.recorder.Event(rbg, corev1.EventTypeWarning, FailedCreateRevision, "Failed create revision for RoleBasedGroup")
r.recorder.Event(
rbg, corev1.EventTypeWarning, FailedCreateRevision, "Failed create revision for RoleBasedGroup",
)
return ctrl.Result{}, err
} else {
logger.Info(fmt.Sprintf("Create revision [%s] successfully", expectedRevision.Name))
r.recorder.Event(rbg, corev1.EventTypeNormal, SucceedCreateRevision, "Successful create revision for RoleBasedGroup")
r.recorder.Event(
rbg, corev1.EventTypeNormal, SucceedCreateRevision, "Successful create revision for RoleBasedGroup",
)
}
}
expectedRolesRevisionHash, err := utils.GetRolesRevisionHash(expectedRevision)
Expand All @@ -133,23 +138,64 @@
return ctrl.Result{}, err
}

// Process roles in dependency order
dependencyManager := dependency.NewDefaultDependencyManager(r.scheme, r.client)
sortedRoles, err := dependencyManager.SortRoles(ctx, rbg)
if err != nil {
r.recorder.Event(rbg, corev1.EventTypeWarning, InvalidRoleDependency, err.Error())
return ctrl.Result{}, err
}

// Process PodGroup
podGroupManager := scheduler.NewPodGroupScheduler(r.client)
if err := podGroupManager.Reconcile(ctx, rbg, runtimeController, &watchedWorkload, r.apiReader); err != nil {
r.recorder.Event(rbg, corev1.EventTypeWarning, FailedCreatePodGroup, err.Error())
return ctrl.Result{}, err
}

if rbg.Spec.Coordination != nil {
r.reconcileCoordination(rbg, expectedRolesRevisionHash)
}

// Reconcile role, add & update
roleStatuses := []workloadsv1alpha1.RoleStatus{}
roleStatuses, updateStatus, err := r.reconcileRoles(ctx, rbg, expectedRolesRevisionHash)

Check failure on line 153 in internal/controller/workloads/rolebasedgroup_controller.go

View workflow job for this annotation

GitHub Actions / staticcheck

this value of err is never used (SA4006)

if updateStatus {
if err := r.updateRBGStatus(ctx, rbg, roleStatuses); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, FailedUpdateStatus,
"Failed to update status for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
}
}

// delete role
if err := r.deleteRoles(ctx, rbg); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, "delete role error",
"Failed to delete roles for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
}

// delete expired controllerRevision
if _, err := utils.CleanExpiredRevision(ctx, r.client, rbg); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, "delete expired revision error",
"Failed to delete expired revision for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
}

r.recorder.Event(rbg, corev1.EventTypeNormal, Succeed, "ReconcileSucceed")
return ctrl.Result{}, nil
}

func (r *RoleBasedGroupReconciler) reconcileRoles(
ctx context.Context, rbg *workloadsv1alpha1.RoleBasedGroup, expectedRolesRevisionHash map[string]string,
) ([]workloadsv1alpha1.RoleStatus, bool, error) {

// Process roles in dependency order
dependencyManager := dependency.NewDefaultDependencyManager(r.scheme, r.client)
sortedRoles, err := dependencyManager.SortRoles(ctx, rbg)
if err != nil {
return nil, false, err
}

var roleStatuses []workloadsv1alpha1.RoleStatus
var updateStatus bool
for _, roleList := range sortedRoles {
var errs error
Expand Down Expand Up @@ -221,40 +267,80 @@
}

if errs != nil {
return ctrl.Result{}, errs
return nil, false, errs
}
}

if updateStatus {
if err := r.updateRBGStatus(ctx, rbg, roleStatuses); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, FailedUpdateStatus,
"Failed to update status for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
return roleStatuses, updateStatus, nil
}

func (r *RoleBasedGroupReconciler) reconcileCoordination(
rbg *workloadsv1alpha1.RoleBasedGroup, expectedRolesRevisionHash map[string]string,
) error {

roleCoordinationState := workloadsv1alpha1.RoleCoordinationState{}
// if coordination is enabled, process coordination
for _, coordination := range rbg.Spec.Coordination {

isCompleted, err := r.isCoordinationRoleCompleted(rbg, coordination, expectedRolesRevisionHash)
if err != nil {
return err
}

for _, strategy := range coordination.Strategy {
if strategy.UpdateStrategy != nil {

role, err := rbg.GetRole(strategy.Role)
if err != nil {
return err
}

role.RolloutStrategy = &workloadsv1alpha1.RolloutStrategy{
Type: workloadsv1alpha1.RollingUpdateStrategyType,
RollingUpdate: &workloadsv1alpha1.RollingUpdate{
MaxUnavailable: strategy.UpdateStrategy.BatchSize,
},
}

if strategy.UpdateStrategy.Partition != nil {
role.RolloutStrategy.RollingUpdate.Partition = strategy.UpdateStrategy.Partition
} else {
rollingStep, err := intstr.GetScaledValueFromIntOrPercent(
&role.RolloutStrategy.RollingUpdate.MaxUnavailable, int(*role.Replicas), false,
)
if err != nil {
return err
}
if isCompleted {
// calculate partition
role.RolloutStrategy.RollingUpdate.Partition = ptr.To(
int32(max(int(*role.RolloutStrategy.RollingUpdate.Partition)-rollingStep, 0)),
)
}

}

roleCoordinationState.Strategy = "UpdateStrategy"
roleCoordinationState.State = fmt.Sprintf("Partition:%d", *role.RolloutStrategy.RollingUpdate.Partition)
}
}
}

// delete role
if err := r.deleteRoles(ctx, rbg); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, "delete role error",
"Failed to delete roles for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
}

// delete expired controllerRevision
if _, err := utils.CleanExpiredRevision(ctx, r.client, rbg); err != nil {
r.recorder.Eventf(
rbg, corev1.EventTypeWarning, "delete expired revision error",
"Failed to delete expired revision for %s: %v", rbg.Name, err,
)
return ctrl.Result{}, err
return nil
}

func (r *RoleBasedGroupReconciler) isCoordinationRoleCompleted(
rbg *workloadsv1alpha1.RoleBasedGroup, coordination workloadsv1alpha1.Coordination,
expectedRolesRevisionHash map[string]string,
) (bool, error) {
if len(rbg.Status.CoordinationState) == 0 {
return false, nil
}
// check coordination state
panic("implement me")

r.recorder.Event(rbg, corev1.EventTypeNormal, Succeed, "ReconcileSucceed")
return ctrl.Result{}, nil
return false, nil
}

func (r *RoleBasedGroupReconciler) deleteRoles(ctx context.Context, rbg *workloadsv1alpha1.RoleBasedGroup) error {
Expand Down Expand Up @@ -394,12 +480,16 @@

return r.client.Create(ctx, rbgScalingAdapter)
}
func (r *RoleBasedGroupReconciler) getCurrentRevision(ctx context.Context, rbg *workloadsv1alpha1.RoleBasedGroup) (*appsv1.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
workloadsv1alpha1.SetNameLabelKey: rbg.Name,
func (r *RoleBasedGroupReconciler) getCurrentRevision(
ctx context.Context, rbg *workloadsv1alpha1.RoleBasedGroup,
) (*appsv1.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(
&metav1.LabelSelector{
MatchLabels: map[string]string{
workloadsv1alpha1.SetNameLabelKey: rbg.Name,
},
},
})
)
if err != nil {
return nil, err
}
Expand Down
Loading