Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation KEP 257: LeaderExcluded SubGroup #428

Merged
merged 14 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion api/config/v1alpha1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (
// Pods that are part of the same subgroup will have the same unique hash value.
SubGroupUniqueHashLabelKey string = "leaderworkerset.sigs.k8s.io/subgroup-key"

// SubGroupPolicyType will be added to leader pods as an annotation which
// corresponds to LeaderWorkerSet.Spec.SubGroupPolicy.Type
SubGroupPolicyTypeAnnotationKey string = "leaderworkerset.sigs.k8s.io/subgroup-policy-type"

// Leader pods will have an annotation that determines what type of domain
// will be injected. Corresponds to LeaderWorkerSet.Spec.NetworkConfig.SubdomainPolicy
SubdomainPolicyAnnotationKey string = "leaderworkerset.sigs.k8s.io/subdomainPolicy"
Expand Down Expand Up @@ -183,6 +187,15 @@ type RolloutStrategy struct {

// SubGroupPolicy describes the policy that will be applied when creating subgroups.
type SubGroupPolicy struct {

// Defines what type of Subgroups to create. Defaults to
// LeaderWorker
//
// +kubebuilder:validation:Enum={LeaderWorker,LeaderExcluded}
// +kubebuilder:default=LeaderWorker
// +optional
Type *SubGroupPolicyType `json:"subGroupPolicyType,omitempty"`

// The number of pods per subgroup. This value is immutable,
// and must not be greater than LeaderWorkerSet.Spec.Size.
// Size must be divisible by subGroupSize in which case the
Expand All @@ -192,6 +205,25 @@ type SubGroupPolicy struct {
SubGroupSize *int32 `json:"subGroupSize,omitempty"`
}

type SubGroupPolicyType string

const (
// LeaderWorker will include the leader in the first subgroup.
// If (LeaderWorkerSet.Spec.LeaderWorkerTemplate.Size-1) is divisible
// by LeaderWorkerSet.Spec.SubGroupPolicy.Size, the groups will look like:
// (0, 1, ... subGroupSize), (subGroupSize + 1, ... 2 * subGroupSize), ...
// If not divisible, the groups will look like:
// (0, 1, ... subGroupSize-1), (subGroupSize, ... 2*subGroupSize - 1), ...
SubGroupPolicyTypeLeaderWorker SubGroupPolicyType = "LeaderWorker"

// LeaderExcluded excludes the leader from any subgroup.
// Only supported when (LeaderWorkerSet.Spec.LeaderWorkerTemplate.Size-1) is divisible
// by LeaderWorkerSet.Spec.SubGroupPolicy.Size.
// Groups will look like:
// (1, ... subGroupSize), (subGroupSize + 1, ... 2 * subGroupSize), ...
SubGroupPolicyTypeLeaderExcluded SubGroupPolicyType = "LeaderExcluded"
)

type NetworkConfig struct {
// SubdomainPolicy determines the policy that will be used when creating
// the headless service, defaults to shared
Expand Down
5 changes: 5 additions & 0 deletions api/leaderworkerset/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8201,6 +8201,15 @@ spec:
SubGroupPolicy describes the policy that will be applied when creating subgroups
in each replica.
properties:
subGroupPolicyType:
default: LeaderWorker
description: |-
Defines what type of Subgroups to create. Defaults to
LeaderWorker
enum:
- LeaderWorker
- LeaderExcluded
type: string
subGroupSize:
description: |-
The number of pods per subgroup. This value is immutable,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
}
if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
Expand All @@ -677,6 +678,7 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
}

podTemplateApplyConfiguration.WithAnnotations(podAnnotations)

// construct statefulset apply configuration
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/leaderworkerset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
revisionKey: revisionKey1,
lws: wrappers.BuildBasicLeaderWorkerSet("test-sample", "default").Annotation(map[string]string{
leaderworkerset.SubGroupExclusiveKeyAnnotationKey: "topologyKey",
}).SubGroupSize(2).Replica(1).
}).SubGroupSize(2).SubGroupType(leaderworkerset.SubGroupPolicyTypeLeaderWorker).Replica(1).
RolloutStrategy(leaderworkerset.RolloutStrategy{
Type: leaderworkerset.RollingUpdateStrategyType,
RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{
Expand Down Expand Up @@ -381,6 +381,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
"leaderworkerset.sigs.k8s.io/size": "2",
leaderworkerset.SubGroupSizeAnnotationKey: "2",
leaderworkerset.SubGroupExclusiveKeyAnnotationKey: "topologyKey",
leaderworkerset.SubGroupPolicyTypeAnnotationKey: string(leaderworkerset.SubGroupPolicyTypeLeaderWorker),
},
},
Spec: &coreapplyv1.PodSpecApplyConfiguration{
Expand Down
1 change: 0 additions & 1 deletion pkg/utils/accelerators/tpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func addTPUVariablesSubGroup(pod *corev1.Pod) error {
if err != nil {
return err
}

workerIndex, err := strconv.Atoi(pod.Labels[leaderworkerset.WorkerIndexLabelKey])
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions pkg/utils/accelerators/tpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ func TestAddTPUVariablesSubGroup(t *testing.T) {
expectedTpuWorkerHostNames: "test-sample-1-5.default,test-sample-1-6.default,test-sample-1-7.default,test-sample-1-8.default",
expectedTpuName: "test-sample-1",
},
{
name: "Leader does not request TPU resources, worker with subgroup index > 0, LeaderOnly Subgroup",
pod: &corev1.Pod{
Spec: wrappers.MakeLeaderPodSpecWithTPUResource(),
ObjectMeta: v1.ObjectMeta{
Name: "test-sample-0-2",
Namespace: "default",
Labels: map[string]string{
leaderworkerset.WorkerIndexLabelKey: "2",
leaderworkerset.SubGroupIndexLabelKey: "0",
},
Annotations: map[string]string{
leaderworkerset.SubGroupSizeAnnotationKey: "2",
leaderworkerset.SubGroupPolicyTypeAnnotationKey: string(leaderworkerset.SubGroupPolicyTypeLeaderExcluded),
},
},
},
expectedTpuWorkerId: "1",
expectedTpuWorkerHostNames: "test-sample-0-1.default,test-sample-0-2.default",
expectedTpuName: "test-sample-0",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/webhooks/leaderworkerset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (r *LeaderWorkerSetWebhook) Default(ctx context.Context, obj runtime.Object
subdomainPolicy := v1.SubdomainShared
lws.Spec.NetworkConfig.SubdomainPolicy = &subdomainPolicy
}

return nil
}

Expand Down Expand Up @@ -244,5 +245,8 @@ func validateUpdateSubGroupPolicy(specPath *field.Path, lws *v1.LeaderWorkerSet)
if size < subGroupSize {
allErrs = append(allErrs, field.Invalid(specPath.Child("leaderWorkerTemplate", "SubGroupPolicy", "subGroupSize"), lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize, "subGroupSize cannot be larger than size"))
}
if (*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type == v1.SubGroupPolicyTypeLeaderExcluded) && ((size-1)%subGroupSize != 0) {
allErrs = append(allErrs, field.Invalid(specPath.Child("leaderWorkerTemplate", "SubGroupPolicy", "subGroupSize"), lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize, "size-1 must be divisible by subGroupSize when using LeaderExcluded"))
}
return allErrs
}
4 changes: 2 additions & 2 deletions pkg/webhooks/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey)
}
_, foundSubGroupSize := pod.Annotations[leaderworkerset.SubGroupSizeAnnotationKey]
if foundSubGroupSize && pod.Labels[leaderworkerset.SubGroupIndexLabelKey] == "" {
subGroupPolicyType := pod.Annotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey]
if foundSubGroupSize && pod.Labels[leaderworkerset.SubGroupIndexLabelKey] == "" && (subGroupPolicyType != string(leaderworkerset.SubGroupPolicyTypeLeaderExcluded)) {
// The leader pod always lands on SubGroup 0.
pod.Labels[leaderworkerset.SubGroupIndexLabelKey] = "0"
subGroupUniqueKey := genGroupUniqueKey(pod.Name, "0")
pod.Labels[leaderworkerset.SubGroupUniqueHashLabelKey] = subGroupUniqueKey

if subEpKey, foundSubEpKey := pod.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]; foundSubEpKey {
SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/webhooks/pod_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,38 @@ func TestExclusiveAffinityApplied(t *testing.T) {
})
}
}

func TestGetSubGroupIndex(t *testing.T) {
tests := []struct {
name string
podCount int
subGroupSize int
workerIndex int
leaderOnly bool
expectedSubGroupIndex string
}{
{
name: "Even number of pods",
podCount: 4,
subGroupSize: 2,
workerIndex: 2,
expectedSubGroupIndex: "1",
},
{
name: "Odd number of pods, first subgroup has an extra pod",
podCount: 5,
subGroupSize: 2,
workerIndex: 2,
expectedSubGroupIndex: "0",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
subGroupIndex := getSubGroupIndex(tc.podCount, tc.subGroupSize, tc.workerIndex)
if tc.expectedSubGroupIndex != subGroupIndex {
t.Errorf("Expected subGroupIndex to be %s, got %s", tc.expectedSubGroupIndex, subGroupIndex)
}
})
}
}
6 changes: 6 additions & 0 deletions test/integration/webhooks/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ var _ = ginkgo.Describe("leaderworkerset defaulting, creation and update", func(
},
lwsCreationShouldFail: true,
}),
ginkgo.Entry("creation where (subGroupSize-1) is not divisible by 1 and SubGroupPolicyTypeLeaderExcluded should fail", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *wrappers.LeaderWorkerSetWrapper {
return wrappers.BuildLeaderWorkerSet(ns.Name).Size(4).SubGroupSize(2).SubGroupType(leaderworkerset.SubGroupPolicyTypeLeaderExcluded)
},
lwsCreationShouldFail: true,
}),
ginkgo.Entry("update with invalid replicas should fail (larger than maxInt32)", &testValidationCase{
makeLeaderWorkerSet: func(ns *corev1.Namespace) *wrappers.LeaderWorkerSetWrapper {
return wrappers.BuildLeaderWorkerSet(ns.Name).Replica(100000).Size(1)
Expand Down
12 changes: 11 additions & 1 deletion test/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,21 @@ func (lwsWrapper *LeaderWorkerSetWrapper) Conditions(conditions []metav1.Conditi
}

func (lwsWrapper *LeaderWorkerSetWrapper) SubGroupSize(subGroupSize int32) *LeaderWorkerSetWrapper {
lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy = &leaderworkerset.SubGroupPolicy{}
if lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy == nil {
lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy = &leaderworkerset.SubGroupPolicy{}
}
lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize = &subGroupSize
return lwsWrapper
}

func (lwsWrapper *LeaderWorkerSetWrapper) SubGroupType(subGroupType leaderworkerset.SubGroupPolicyType) *LeaderWorkerSetWrapper {
if lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy == nil {
lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy = &leaderworkerset.SubGroupPolicy{}
}
lwsWrapper.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type = &subGroupType
return lwsWrapper
}

func (lwsWrapper *LeaderWorkerSetWrapper) SubdomainPolicy(subdomainPolicy leaderworkerset.SubdomainPolicy) *LeaderWorkerSetWrapper {
lwsWrapper.Spec.NetworkConfig = &leaderworkerset.NetworkConfig{
SubdomainPolicy: &subdomainPolicy,
Expand Down