Skip to content

Commit 8b59cf5

Browse files
author
sriharsha
committed
Add parent queue limit and quota checking with elastic job support
1 parent b029941 commit 8b59cf5

File tree

2 files changed

+290
-115
lines changed

2 files changed

+290
-115
lines changed

pkg/scheduler/plugins/proportion/capacity_policy/capacity_policy.go

Lines changed: 67 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
1717
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
1818
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
19-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants"
2019
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
2120
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
2221
rs "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/plugins/proportion/resource_share"
@@ -125,119 +124,96 @@ func getFirstPendingPod(job *podgroup_info.PodGroupInfo) *pod_info.PodInfo {
125124
return nil
126125
}
127126

128-
// OnSessionOpen is called when a new scheduling session begins. It registers
129-
// the early quota checking function that prevents jobs from being considered
130-
// for scheduling if they would exceed their parent queues' quotas.
131-
func (cp *CapacityPolicy) OnSessionOpen(ssn *framework.Session) {
132-
// Register early quota checks
133-
ssn.AddPrePredicateFn(func(task *pod_info.PodInfo, job *podgroup_info.PodGroupInfo) error {
134-
// Only check for the first pending pod to avoid duplicate checks
135-
firstPending := getFirstPendingPod(job)
136-
if firstPending == nil || task != firstPending {
137-
return nil
138-
}
139-
140-
// Check parent queue quotas
141-
return cp.checkParentQueueQuotas(job, ssn)
142-
})
143-
}
144-
145-
// checkParentQueueQuotas verifies that a job's resource requirements don't
146-
// exceed quotas at any level in its queue hierarchy. This includes:
147-
// - GPU quota checks
148-
// - CPU quota checks
149-
// - Memory quota checks
127+
// checkParentQueueLimits verifies that a job's resource requirements don't
128+
// exceed limits or quotas at any level in its queue hierarchy. This includes:
129+
// - GPU limit/quota checks
130+
// - CPU limit/quota checks
131+
// - Memory limit/quota checks
150132
//
151133
// The function traverses up the queue hierarchy starting from the job's
152-
// immediate parent queue. If any quota would be exceeded, it returns an
134+
// immediate parent queue. If any limit or quota would be exceeded, it returns an
153135
// error with a detailed message.
154136
//
155-
// Note: Preemptible jobs (PriorityTrainNumber) are allowed to exceed parent
156-
// queue quotas, while non-preemptible jobs must strictly adhere to quotas.
157-
func (cp *CapacityPolicy) checkParentQueueQuotas(job *podgroup_info.PodGroupInfo, ssn *framework.Session) error {
158-
// Skip quota checks for preemptible jobs
159-
if job.Priority == constants.PriorityTrainNumber {
160-
log.InfraLogger.V(5).Infof("Job: <%v/%v> is preemptible, skipping parent queue quota checks", job.Namespace, job.Name)
137+
// For preemptible jobs (jobs with priority PriorityInferenceNumber), the
138+
// limit/quota checks are skipped as they are allowed to exceed queue limits/quotas.
139+
func (cp *CapacityPolicy) checkParentQueueLimits(job *podgroup_info.PodGroupInfo, ssn *framework.Session) error {
140+
// Skip limit/quota checks for preemptible jobs
141+
if job.IsPreemptibleJob(cp.isInferencePreemptible) {
161142
return nil
162143
}
163144

164145
// Get queue info for this job
165146
queue, found := ssn.Queues[job.Queue]
166147
if !found {
167-
return nil // Can't check quota without queue info
148+
return nil // Can't check limits/quotas without queue info
168149
}
169150

170-
// Only check parent queues, not the job's direct queue
171-
currentQueueID := queue.ParentQueue
151+
// Calculate job's minimum required resources
152+
jobResources := resource_info.EmptyResource()
153+
for _, pod := range job.PodInfos {
154+
if pod.Status == pod_status.Pending {
155+
jobResources.AddResourceRequirements(pod.ResReq)
156+
}
157+
}
172158

173-
for currentQueueID != "" {
159+
// Traverse up the queue hierarchy
160+
for currentQueueID := queue.ParentQueue; currentQueueID != ""; currentQueueID = ssn.Queues[currentQueueID].ParentQueue {
174161
parentQueue, found := ssn.Queues[currentQueueID]
175162
if !found {
176163
break
177164
}
178165

179-
// Calculate job's total resource requirements
180-
jobResources := resource_info.EmptyResource()
181-
for _, pod := range job.PodInfos {
182-
if pod.Status == pod_status.Pending {
183-
jobResources.AddResourceRequirements(pod.ResReq)
184-
}
166+
// Check resource limits and quotas
167+
resourceChecks := []struct {
168+
resourceType string
169+
limit float64
170+
quota float64
171+
used float64
172+
}{
173+
{"GPU", float64(parentQueue.Resources.GPU.Limit), float64(parentQueue.Resources.GPU.Quota), jobResources.GPUs()},
174+
{"CPU", float64(parentQueue.Resources.CPU.Limit), float64(parentQueue.Resources.CPU.Quota), jobResources.Cpu()},
175+
{"Memory", float64(parentQueue.Resources.Memory.Limit), float64(parentQueue.Resources.Memory.Quota), jobResources.Memory()},
185176
}
186177

187-
// Check GPU quota
188-
if parentQueue.Resources.GPU.Quota > 0 && jobResources.GPUs() > float64(parentQueue.Resources.GPU.Quota) {
189-
errorMsg := fmt.Sprintf(
190-
"parent queue '%s' quota has reached the allowable limit of GPUs. "+
191-
"Limit is %.0f GPUs, workload requested %.0f GPUs",
192-
parentQueue.Name,
193-
parentQueue.Resources.GPU.Quota,
194-
jobResources.GPUs())
195-
196-
// Record event
197-
if firstPod := getFirstPendingPod(job); firstPod != nil {
198-
log.InfraLogger.Warningf("Queue quota exceeded: %s", errorMsg)
178+
for _, check := range resourceChecks {
179+
// Check if either limit or quota is exceeded
180+
if (check.limit > 0 && check.used > check.limit) || (check.quota > 0 && check.used > check.quota) {
181+
// Use the more restrictive value for the error message
182+
restrictiveValue := check.limit
183+
if check.quota > 0 && (check.limit == 0 || check.quota < check.limit) {
184+
restrictiveValue = check.quota
185+
}
186+
187+
errorMsg := fmt.Sprintf(
188+
"parent queue '%s' has reached the %s of %s. "+
189+
"Value is %.0f, workload requested %.0f",
190+
parentQueue.Name,
191+
func() string {
192+
if check.limit > 0 && check.quota > 0 {
193+
return "limit/quota"
194+
} else if check.limit > 0 {
195+
return "limit"
196+
} else {
197+
return "quota"
198+
}
199+
}(),
200+
check.resourceType,
201+
restrictiveValue,
202+
check.used)
203+
204+
log.InfraLogger.Warningf("Queue limit/quota exceeded: %s", errorMsg)
205+
return fmt.Errorf(errorMsg)
199206
}
200-
201-
return fmt.Errorf(errorMsg)
202207
}
203-
204-
// Check CPU quota
205-
if parentQueue.Resources.CPU.Quota > 0 && jobResources.Cpu() > float64(parentQueue.Resources.CPU.Quota) {
206-
errorMsg := fmt.Sprintf(
207-
"parent queue '%s' quota has reached the allowable limit of CPU. "+
208-
"Limit is %.0f CPU, workload requested %.0f CPU",
209-
parentQueue.Name,
210-
parentQueue.Resources.CPU.Quota,
211-
jobResources.Cpu())
212-
213-
// Record event
214-
if firstPod := getFirstPendingPod(job); firstPod != nil {
215-
log.InfraLogger.Warningf("Queue quota exceeded: %s", errorMsg)
216-
}
217-
218-
return fmt.Errorf(errorMsg)
219-
}
220-
221-
// Check Memory quota
222-
if parentQueue.Resources.Memory.Quota > 0 && jobResources.Memory() > float64(parentQueue.Resources.Memory.Quota) {
223-
errorMsg := fmt.Sprintf(
224-
"parent queue '%s' quota has reached the allowable limit of Memory. "+
225-
"Limit is %.0f Memory, workload requested %.0f Memory",
226-
parentQueue.Name,
227-
parentQueue.Resources.Memory.Quota,
228-
jobResources.Memory())
229-
230-
// Record event
231-
if firstPod := getFirstPendingPod(job); firstPod != nil {
232-
log.InfraLogger.Warningf("Queue quota exceeded: %s", errorMsg)
233-
}
234-
235-
return fmt.Errorf(errorMsg)
236-
}
237-
238-
// Move up the hierarchy
239-
currentQueueID = parentQueue.ParentQueue
240208
}
241209

242210
return nil
243211
}
212+
213+
// OnSessionOpen registers the early limit checking function that prevents jobs
214+
// from being considered for scheduling if they would exceed their parent queues' limits.
215+
func (cp *CapacityPolicy) OnSessionOpen(ssn *framework.Session) {
216+
ssn.AddPrePredicateFn(func(task *pod_info.PodInfo, job *podgroup_info.PodGroupInfo) error {
217+
return cp.checkParentQueueLimits(job, ssn)
218+
})
219+
}

0 commit comments

Comments
 (0)