Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix calculations of podgroup min resource #3057

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package job
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -38,7 +36,6 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
"volcano.sh/volcano/pkg/controllers/util"
)

var calMutex sync.Mutex
Expand Down Expand Up @@ -769,6 +766,7 @@ func (cc *jobcontroller) deleteJobPod(jobName string, pod *v1.Pod) error {
func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
// sort task by priorityClasses
var tasksPriority TasksPriority
totalMinAvailable := int32(0)
for _, task := range job.Spec.Tasks {
tp := TaskPriority{0, task}
pc := task.Template.Spec.PriorityClassName
Expand All @@ -782,26 +780,24 @@ func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
}
}
tasksPriority = append(tasksPriority, tp)
if task.MinAvailable != nil { // actually, it can not be nil, because nil value will be patched in webhook
totalMinAvailable += *task.MinAvailable
} else {
totalMinAvailable += task.Replicas
}
}

sort.Sort(tasksPriority)

minReq := v1.ResourceList{}
podCnt := int32(0)
for _, task := range tasksPriority {
for i := int32(0); i < task.Replicas; i++ {
if podCnt >= job.Spec.MinAvailable {
break
}

podCnt++
pod := &v1.Pod{
Spec: task.Template.Spec,
}
minReq = quotav1.Add(minReq, *util.GetPodQuotaUsage(pod))
}
// see docs https://github.com/volcano-sh/volcano/pull/2945
// 1. job.MinAvailable < sum(task.MinAvailable), regard podgroup's min resource as sum of the first minAvailable,
// according to https://github.com/volcano-sh/volcano/blob/c91eb07f2c300e4d5c826ff11a63b91781b3ac11/pkg/scheduler/api/job_info.go#L738-L740
if job.Spec.MinAvailable < totalMinAvailable {
minReq := tasksPriority.CalcFirstCountResources(job.Spec.MinAvailable)
return &minReq
}

// 2. job.MinAvailable >= sum(task.MinAvailable)
minReq := tasksPriority.CalcPGMinResources(job.Spec.MinAvailable)

return &minReq
}

Expand Down
82 changes: 82 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package job

import (
"fmt"
"sort"
"strconv"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -31,6 +33,7 @@ import (
schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/util"
)

// MakePodName append podname,jobname,taskName and index and returns the string.
Expand Down Expand Up @@ -259,3 +262,82 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
}
return false
}

// CalcFirstCountResources return the first count tasks resource, sorted by priority
func (p TasksPriority) CalcFirstCountResources(count int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}

for _, task := range p {
if count <= task.Replicas {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count))
break
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
count -= task.Replicas
}
}
return minReq
}

// CalcPGMinResources sums up all task's min available; if not enough, then fill up to jobMinAvailable via task's replicas
func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}
podCnt := int32(0)

// 1. first sum up those tasks whose MinAvailable is set
for _, task := range p {
if task.MinAvailable == nil { // actually, all task's min available is set by webhook
continue
}

validReplics := *task.MinAvailable
if left := jobMinAvailable - podCnt; left < validReplics {
validReplics = left
}
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics))
podCnt += validReplics
if podCnt >= jobMinAvailable {
break
}
}

if podCnt >= jobMinAvailable {
return minReq
}

// 2. fill up the count of pod to jobMinAvailable with tasks whose replicas is not used up, higher priority first
leftCnt := jobMinAvailable - podCnt
for _, task := range p {
left := task.Replicas
if task.MinAvailable != nil {
if *task.MinAvailable == task.Replicas {
continue
} else {
left = task.Replicas - *task.MinAvailable
}
}

if leftCnt >= left {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left))
leftCnt -= left
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt))
leftCnt = 0
}
if leftCnt <= 0 {
break
}
}
return minReq
}

// calTaskRequests returns requests resource with validReplica replicas
func calTaskRequests(pod *v1.Pod, validReplica int32) v1.ResourceList {
minReq := v1.ResourceList{}
for i := int32(0); i < validReplica; i++ {
minReq = quotav1.Add(minReq, *util.GetPodQuotaUsage(pod))
}
return minReq
}
Loading
Loading