-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kubernetes executor cleanup_stuck_queued_tasks optimization #41220
base: main
Are you sure you want to change the base?
kubernetes executor cleanup_stuck_queued_tasks optimization #41220
Conversation
0a03529
to
bef1e02
Compare
b10bf25
to
4459e0f
Compare
@jedcunningham / @hussein-awala |
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Outdated
Show resolved
Hide resolved
691f142
to
5b8e059
Compare
@jedcunningham / @hussein-awala |
4538ec4
to
0b1d4a5
Compare
@dirrao can you add some details in the description? Just repeating the commit message/title isn't very useful, and having to go grok 100+ lines of change to know what the goal is isn't great for reviewing now nor next year when someone is doing git blame :) e.g. things like what is done now, what you are doing instead, expected impact. |
Sorry for not putting the details around the problem. I have updated the details in description of the PR. |
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Outdated
Show resolved
Hide resolved
Changelist by BitoThis pull request implements the following key changes.
|
Problem: Airflow running the cleanup_stuck_queued_tasks function on a certain frequency. When we run the airflow on a large Kube cluster (pods more than > 5K). Internally the cleanup_stuck_queued_tasks function loops through each queued task (when they breach task queued timeout) and checks the corresponding worker pod existence in the Kube cluster. Right now, this existence check using list pods Kube API. The API is taking more than 1s. if there are 120 queued tasks, then it will take ~ 120 seconds (1s * 120). So, this leads the scheduler to spend most of its time in this function rather than scheduling the tasks. It leads to none of the jobs being scheduled or degraded scheduler performance.
Solution: Use single k8 list pods batch api call to get all the worker pod owned by scheduler. Prepare the set of searchable strings using pod labels. Use this set data structure and identify whether the task associated pod exists or not. This reduces the number kube api sever calls significantly.
set elements string format:
(dag_id=<dag_id>,task_id=<task_id>,airflow-worker=[,map_index=<map_index>],[run_id=<run_id>]