From eda97ac164bfc4933d10a4070d7d23274ef9da7f Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 6 Sep 2023 22:24:38 -0400 Subject: [PATCH 01/11] remove preempt queue job thread --- .../queuejob/queuejob_controller_ex.go | 179 ++++++++++-------- 1 file changed, 101 insertions(+), 78 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 3f776778..13272fd2 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1512,7 +1512,7 @@ func (cc *XController) Run(stopCh <-chan struct{}) { go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh) // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion - go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh) + //go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh) if cc.isDispatcher { go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent? @@ -1536,90 +1536,79 @@ func (qjm *XController) UpdateAgent() { // Move AW from Running to Completed or RunningHoldCompletion // Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state. // State transition: Running->RunningHoldCompletion->Completed -func (qjm *XController) UpdateQueueJobs() { - queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err) - return - } - containsCompletionStatus := false - for _, newjob := range queueJobs { - for _, item := range newjob.Spec.AggrResources.GenericItems { - if len(item.CompletionStatus) > 0 { - containsCompletionStatus = true - } +func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { + + if newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion { + err := qjm.UpdateQueueJobStatus(newjob) + if err != nil { + klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err) + //TODO: should we really return? + return } - if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus { - err := qjm.UpdateQueueJobStatus(newjob) - if err != nil { - klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err) - continue - } - klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) - // set appwrapper status to Complete or RunningHoldCompletion - derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) + klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) + // set appwrapper status to Complete or RunningHoldCompletion + derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) - klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion, - newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) + klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion, + newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) - // Set Appwrapper state to complete if all items in Appwrapper - // are completed - if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion { - newjob.Status.State = derivedAwStatus - var updateQj *arbv1.AppWrapper - index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted") - if index < 0 { - newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") - newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion - updateQj = newjob.DeepCopy() - } else { - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") - newjob.Status.Conditions[index] = *cond.DeepCopy() - updateQj = newjob.DeepCopy() - } - err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion") - if err != nil { - // TODO: implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) - } + // Set Appwrapper state to complete if all items in Appwrapper + // are completed + if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion { + newjob.Status.State = derivedAwStatus + var updateQj *arbv1.AppWrapper + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted") + if index < 0 { + newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") + newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion + updateQj = newjob.DeepCopy() + } else { + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") + newjob.Status.Conditions[index] = *cond.DeepCopy() + updateQj = newjob.DeepCopy() } - // Set appwrapper status to complete - if derivedAwStatus == arbv1.AppWrapperStateCompleted { - newjob.Status.State = derivedAwStatus - newjob.Status.CanRun = false - var updateQj *arbv1.AppWrapper - index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted") - if index < 0 { - newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") - newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted - updateQj = newjob.DeepCopy() - } else { - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") - newjob.Status.Conditions[index] = *cond.DeepCopy() - updateQj = newjob.DeepCopy() - } - err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted") - if err != nil { - if qjm.quotaManager != nil { - qjm.quotaManager.Release(updateQj) - } - // TODO: Implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) - } + err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion") + if err != nil { + // TODO: implement retry + klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + } + } + // Set appwrapper status to complete + if derivedAwStatus == arbv1.AppWrapperStateCompleted { + newjob.Status.State = derivedAwStatus + newjob.Status.CanRun = false + var updateQj *arbv1.AppWrapper + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted") + if index < 0 { + newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") + newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted + updateQj = newjob.DeepCopy() + } else { + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") + newjob.Status.Conditions[index] = *cond.DeepCopy() + updateQj = newjob.DeepCopy() + } + err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted") + if err != nil { if qjm.quotaManager != nil { qjm.quotaManager.Release(updateQj) } - // Delete AW from both queue's - qjm.eventQueue.Delete(updateQj) - qjm.qjqueue.Delete(updateQj) + // TODO: Implement retry + klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + } + if qjm.quotaManager != nil { + qjm.quotaManager.Release(updateQj) } - klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, - newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) + // Delete AW from both queue's + qjm.eventQueue.Delete(updateQj) + qjm.qjqueue.Delete(updateQj) } + klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, + newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) } } @@ -1678,6 +1667,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status) + notBackedoff := true for _, cond := range newQJ.Status.Conditions { if cond.Type == arbv1.AppWrapperCondBackoff { //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue. @@ -1688,12 +1678,45 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } cc.enqueue(newQJ) }) - return + notBackedoff = false } } // cc.eventQueue.Delete(oldObj) - cc.enqueue(newQJ) + if notBackedoff { + cc.enqueue(newQJ) + + // Requeue the item to be processed again in 30 seconds. + //TODO: tune the frequency of reprocessing an AW + hasCompletionStatus := false + for _, genericItem := range newQJ.Spec.AggrResources.GenericItems { + if len(genericItem.CompletionStatus) > 0 { + hasCompletionStatus = true + } + } + //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate + //on slate Aws + if newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed && newQJ.Status.State != "" { + requeueInterval := 30 * time.Second + key, err := cache.MetaNamespaceKeyFunc(newQJ) + if err == nil { + go func() { + for { + time.Sleep(requeueInterval) + latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if err == nil && exists { + // Enqueue the latest copy of the AW. + if (newQJ.Status.State != arbv1.AppWrapperStateCompleted || newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { + cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) + klog.V(2).Infof("[Informer-updateQJ] Finished requeing AW to determine completion status") + } + } + } + }() + } + } + } + } // a, b arbitrary length numerical string. returns true if a larger than b From 213e6bd63ca30dc6d1205b49f1caf985e721da3f Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 6 Sep 2023 22:34:00 -0400 Subject: [PATCH 02/11] fix typo in comment --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 13272fd2..e6b70263 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1695,7 +1695,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } } //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate - //on slate Aws + //on stale AWs. This has potential to improve performance at scale. if newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed && newQJ.Status.State != "" { requeueInterval := 30 * time.Second key, err := cache.MetaNamespaceKeyFunc(newQJ) From 5947878941fa513b72c7836403f50247f13bb7c4 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 6 Sep 2023 22:48:18 -0400 Subject: [PATCH 03/11] fix unit tests --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index e6b70263..f0c1f277 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1706,7 +1706,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if err == nil && exists { // Enqueue the latest copy of the AW. - if (newQJ.Status.State != arbv1.AppWrapperStateCompleted || newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { + if (newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) klog.V(2).Infof("[Informer-updateQJ] Finished requeing AW to determine completion status") } From 1b2604d480a8ab939eee97f64da222381923870b Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 13:45:39 -0400 Subject: [PATCH 04/11] add break condition --- pkg/controller/queuejob/queuejob_controller_ex.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index f0c1f277..3a43cb7d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1704,6 +1704,10 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { for { time.Sleep(requeueInterval) latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if latestAw.(*arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateCompleted || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateFailed || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateDeleted { + klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + break + } if err == nil && exists { // Enqueue the latest copy of the AW. if (newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { From 5e2dd289027e68c202f49519245971e7d26b11eb Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 13:52:01 -0400 Subject: [PATCH 05/11] do not requeue if AW does not exists --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 3a43cb7d..7d143d0a 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1704,7 +1704,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { for { time.Sleep(requeueInterval) latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if latestAw.(*arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateCompleted || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateFailed || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateDeleted { + if latestAw.(*arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateCompleted || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateFailed || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateDeleted || !exists { klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) break } From 12109c040bc79900db6ee1bfc33e34eeacd9f13f Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 15:35:29 -0400 Subject: [PATCH 06/11] merge --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 7d143d0a..24d0ac41 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1706,7 +1706,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if latestAw.(*arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateCompleted || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateFailed || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateDeleted || !exists { klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) - break + break //Exit the loop } if err == nil && exists { // Enqueue the latest copy of the AW. From 913538345b65d3b674b1c7027deaf244bb60d76e Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 16:02:00 -0400 Subject: [PATCH 07/11] fix break condition --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 24d0ac41..ae74370c 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1704,7 +1704,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { for { time.Sleep(requeueInterval) latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if latestAw.(*arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateCompleted || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateFailed || latestAw.(arbv1.AppWrapper).Status.State == arbv1.AppWrapperStateDeleted || !exists { + if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion || !exists { klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) break //Exit the loop } From f20eda15ed808effac475a2495e44acfce4188b9 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 7 Sep 2023 21:32:50 -0400 Subject: [PATCH 08/11] fix duplicate requeues --- .../queuejob/queuejob_controller_ex.go | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index ae74370c..b74b0d83 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1642,6 +1642,40 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) cc.enqueue(qj) + // Requeue the item to be processed again in 30 seconds. + //TODO: tune the frequency of reprocessing an AW + hasCompletionStatus := false + for _, genericItem := range qj.Spec.AggrResources.GenericItems { + if len(genericItem.CompletionStatus) > 0 { + hasCompletionStatus = true + } + } + //When an AW entrs a system with completionstatus keep checking the AW until completed + //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate + //on stale AWs. This has potential to improve performance at scale. + //if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" { + requeueInterval := 30 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err == nil { + go func() { + for { + time.Sleep(requeueInterval) + latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + break //Exit the loop + } + if err == nil && exists { + // Enqueue the latest copy of the AW. + if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { + cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) + klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status") + } + } + } + }() + } + //} } func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { @@ -1685,40 +1719,6 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { // cc.eventQueue.Delete(oldObj) if notBackedoff { cc.enqueue(newQJ) - - // Requeue the item to be processed again in 30 seconds. - //TODO: tune the frequency of reprocessing an AW - hasCompletionStatus := false - for _, genericItem := range newQJ.Spec.AggrResources.GenericItems { - if len(genericItem.CompletionStatus) > 0 { - hasCompletionStatus = true - } - } - //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate - //on stale AWs. This has potential to improve performance at scale. - if newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed && newQJ.Status.State != "" { - requeueInterval := 30 * time.Second - key, err := cache.MetaNamespaceKeyFunc(newQJ) - if err == nil { - go func() { - for { - time.Sleep(requeueInterval) - latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion || !exists { - klog.V(2).Infof("[Informer-updateQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) - break //Exit the loop - } - if err == nil && exists { - // Enqueue the latest copy of the AW. - if (newQJ.Status.State != arbv1.AppWrapperStateCompleted && newQJ.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { - cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) - klog.V(2).Infof("[Informer-updateQJ] Finished requeing AW to determine completion status") - } - } - } - }() - } - } } } From f821d184531dceaedfaae81811c33fb33e8053cd Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 8 Sep 2023 12:39:45 -0400 Subject: [PATCH 09/11] address review --- .../queuejob/queuejob_controller_ex.go | 218 +++++++++--------- 1 file changed, 112 insertions(+), 106 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index b74b0d83..90d7b6fe 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -348,13 +348,14 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * // TODO: We can use informer to filter AWs that do not meet the minScheduling spec. // we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer -func (qjm *XController) PreemptQueueJobs() { +func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { ctx := context.Background() + aw := qjm.GetQueueJobEligibleForPreemption(inspectAw) + if aw != nil { - qjobs := qjm.GetQueueJobsEligibleForPreemption() - for _, aw := range qjobs { + //for _, aw := range qjobs { if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { - continue + return } var updateNewJob *arbv1.AppWrapper @@ -362,7 +363,7 @@ func (qjm *XController) PreemptQueueJobs() { newjob, err := qjm.getAppWrapper(aw.Namespace, aw.Name, "[PreemptQueueJobs] get fresh app wrapper") if err != nil { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) - continue + return } //we need to update AW before analyzing it as a candidate for preemption updateErr := qjm.UpdateQueueJobStatus(newjob) @@ -394,13 +395,11 @@ func (qjm *XController) PreemptQueueJobs() { err := qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded") if err != nil { klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", newjob.Namespace, newjob.Name) - continue + return } // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) - // Move to next AW - continue } } @@ -462,7 +461,7 @@ func (qjm *XController) PreemptQueueJobs() { err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") if err != nil { klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) - continue + return } if cleanAppWrapper { @@ -506,98 +505,83 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A } } -func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper { - qjobs := make([]*arbv1.AppWrapper, 0) - - queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("List of queueJobs %+v", qjobs) - return qjobs - } +func (qjm *XController) GetQueueJobEligibleForPreemption(value *arbv1.AppWrapper) *arbv1.AppWrapper { if !qjm.isDispatcher { // Agent Mode - for _, value := range queueJobs { - - // Skip if AW Pending or just entering the system and does not have a state yet. - if (value.Status.State == arbv1.AppWrapperStateEnqueued) || (value.Status.State == "") { - continue - } - if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 { - awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit - dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second) - currentTime := time.Now() - dispatchTimeExceeded := !currentTime.Before(dispatchDuration) + if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 { + awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit + dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second) + currentTime := time.Now() + dispatchTimeExceeded := !currentTime.Before(dispatchDuration) - if dispatchTimeExceeded { - klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration) - value.Spec.SchedSpec.DispatchDuration.Overrun = true - qjobs = append(qjobs, value) - // Got AW which exceeded dispatch runtime limit, move to next AW - continue - } + if dispatchTimeExceeded { + klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration) + value.Spec.SchedSpec.DispatchDuration.Overrun = true + // Got AW which exceeded dispatch runtime limit, move to next AW + return value } - replicas := value.Spec.SchedSpec.MinAvailable + } + replicas := value.Spec.SchedSpec.MinAvailable - if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { + if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { - // Find the dispatched condition if there is any - numConditions := len(value.Status.Conditions) - var dispatchedCondition arbv1.AppWrapperCondition - dispatchedConditionExists := false + // Find the dispatched condition if there is any + numConditions := len(value.Status.Conditions) + var dispatchedCondition arbv1.AppWrapperCondition + dispatchedConditionExists := false - for i := numConditions - 1; i > 0; i-- { - dispatchedCondition = value.Status.Conditions[i] - if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { - continue - } - dispatchedConditionExists = true - break + for i := numConditions - 1; i > 0; i-- { + dispatchedCondition = value.Status.Conditions[i] + if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { + continue } + dispatchedConditionExists = true + break + } - // Check for the minimum age and then skip preempt if current time is not beyond minimum age - // The minimum age is controlled by the requeuing.TimeInSeconds stanza - // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later - lastCondition := value.Status.Conditions[numConditions-1] - var condition arbv1.AppWrapperCondition + // Check for the minimum age and then skip preempt if current time is not beyond minimum age + // The minimum age is controlled by the requeuing.TimeInSeconds stanza + // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later + lastCondition := value.Status.Conditions[numConditions-1] + var condition arbv1.AppWrapperCondition - if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { - condition = dispatchedCondition - } else { - condition = lastCondition - } - var requeuingTimeInSeconds int - if value.Status.RequeueingTimeInSeconds > 0 { - requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds - } else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { - requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds - } else { - requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds - } + if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { + condition = dispatchedCondition + } else { + condition = lastCondition + } + var requeuingTimeInSeconds int + if value.Status.RequeueingTimeInSeconds > 0 { + requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds + } else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { + requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds + } else { + requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds + } - minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) - currentTime := time.Now() + minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) + currentTime := time.Now() - if currentTime.Before(minAge) { - continue - } + if currentTime.Before(minAge) { + return nil + } - if replicas > 0 { - klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded) - qjobs = append(qjobs, value) - } - } else { - // Preempt when schedulingSpec stanza is not set but pods fails scheduling. - // ignore co-scheduler pods - if len(value.Status.PendingPodConditions) > 0 { - klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded) - qjobs = append(qjobs, value) - } + if replicas > 0 { + klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded) + return value + } + } else { + // Preempt when schedulingSpec stanza is not set but pods fails scheduling. + // ignore co-scheduler pods + if len(value.Status.PendingPodConditions) > 0 { + klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded) + return value } } } - return qjobs + return nil } func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapper) []*clusterstateapi.Resource { @@ -1500,20 +1484,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason func (cc *XController) Run(stopCh <-chan struct{}) { go cc.appwrapperInformer.Informer().Run(stopCh) - // go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh) - cache.WaitForCacheSync(stopCh, cc.appWrapperSynced) - // cache is turned off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588 - // update snapshot of ClientStateCache every second - // cc.cache.Run(stopCh) - - // start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration - go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh) - - // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion - //go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh) - if cc.isDispatcher { go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent? for _, jobClusterAgent := range cc.agentMap { @@ -1653,29 +1625,63 @@ func (cc *XController) addQueueJob(obj interface{}) { //When an AW entrs a system with completionstatus keep checking the AW until completed //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate //on stale AWs. This has potential to improve performance at scale. - //if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" { - requeueInterval := 30 * time.Second - key, err := cache.MetaNamespaceKeyFunc(qj) - if err == nil { + if hasCompletionStatus { + requeueInterval := 5 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err != nil { + klog.Warningf("[Informer-addQJ] Error getting AW %s from cache cannot determine completion status", qj.Name) + //TODO: should we return from this loop? + } go func() { for { time.Sleep(requeueInterval) latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) - if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { - klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) - break //Exit the loop - } - if err == nil && exists { + if err != nil && !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) + } else { + if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + break //Exit the loop + } // Enqueue the latest copy of the AW. if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) - klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status") + klog.V(2).Infof("[Informer-addQJ] requeing AW to determine completion status for AW", qj.Name) + } + + } + + } + }() + } + + if qj.Spec.SchedSpec.MinAvailable > 0 { + requeueInterval := 60 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err != nil { + klog.Errorf("[Informer-addQJ] Error getting AW %s from cache cannot preempt AW", qj.Name) + //TODO: should we return from this loop? + } + go func() { + for { + time.Sleep(requeueInterval) + latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if err != nil && !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) + } else { + if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + break //Exit the loop + } + // Enqueue the latest copy of the AW. + if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && (qj.Spec.SchedSpec.MinAvailable > 0) { + cc.PreemptQueueJobs(latestAw.(*arbv1.AppWrapper)) + klog.V(2).Infof("[Informer-addQJ] requeing AW to check minScheduling spec for AW", qj.Name) } } } }() } - //} } func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { From 333b6991fa3d3c58a986b5f0970171157924e148 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 8 Sep 2023 19:06:07 -0400 Subject: [PATCH 10/11] handle nil AW --- .../queuejob/queuejob_controller_ex.go | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 90d7b6fe..f17ad058 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1613,7 +1613,7 @@ func (cc *XController) addQueueJob(obj interface{}) { qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.CreationTimestamp, qj.Status.ControllerFirstTimestamp) klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) - cc.enqueue(qj) + // Requeue the item to be processed again in 30 seconds. //TODO: tune the frequency of reprocessing an AW hasCompletionStatus := false @@ -1635,17 +1635,23 @@ func (cc *XController) addQueueJob(obj interface{}) { go func() { for { time.Sleep(requeueInterval) - latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if err != nil && !exists { klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) } else { - if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { - klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + var latestAw *arbv1.AppWrapper + if latestObj != nil { + latestAw = latestObj.(*arbv1.AppWrapper) + } else { + latestAw = qj + } + if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) break //Exit the loop } // Enqueue the latest copy of the AW. if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { - cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper)) + cc.UpdateQueueJobs(latestAw) klog.V(2).Infof("[Informer-addQJ] requeing AW to determine completion status for AW", qj.Name) } @@ -1665,23 +1671,30 @@ func (cc *XController) addQueueJob(obj interface{}) { go func() { for { time.Sleep(requeueInterval) - latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) if err != nil && !exists { klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) } else { - if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion { - klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State) + var latestAw *arbv1.AppWrapper + if latestObj != nil { + latestAw = latestObj.(*arbv1.AppWrapper) + } else { + latestAw = qj + } + if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) break //Exit the loop } // Enqueue the latest copy of the AW. if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && (qj.Spec.SchedSpec.MinAvailable > 0) { - cc.PreemptQueueJobs(latestAw.(*arbv1.AppWrapper)) + cc.PreemptQueueJobs(latestAw) klog.V(2).Infof("[Informer-addQJ] requeing AW to check minScheduling spec for AW", qj.Name) } } } }() } + cc.enqueue(qj) } func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { From 43e2f6d7fbff4373ffdbbf98589a751c6467e423 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 8 Sep 2023 20:14:34 -0400 Subject: [PATCH 11/11] fix tests --- test/e2e/queue.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 2809f721..2fd6f1b3 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -141,12 +141,12 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createJobAWWithInitContainer(context, "aw-job-3-init-container-1", 60, "exponential", 0) appwrappers = append(appwrappers, aw) - err := waitAWPodsCompleted(context, aw, 12*time.Minute) // This test waits for 12 minutes to make sure all PODs complete + err := waitAWPodsCompleted(context, aw, 14*time.Minute) // This test waits for 14 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred(), "Waiting for the pods to be completed") }) It("MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test", func() { - fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test - Started.\n") context := initTestContext() var appwrappers []*arbv1.AppWrapper @@ -513,7 +513,7 @@ var _ = Describe("AppWrapper E2E Test", func() { defer cleanupTestObjectsPtr(context, appwrappersPtr) // This should fill up the worker node and most of the master node - aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) + aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu-2")) appwrappers = append(appwrappers, aw) err := waitAWPodsReady(context, aw)