Almost-starved pools preventing scheduling in pools with free slots (possible bug or improvement idea) #25139
Replies: 3 comments 7 replies
-
I think you have wrong configuration, you should give higher priority to your small tasks. That should solve the problem in this case. |
Beta Was this translation helpful? Give feedback.
-
Correct. If you have an idea how to solve it and tests covering it you are most welcome to make PRs. I think it is impossible to solve all possible cases and Airflow focuses on "realistic" ones. But if you think you can propose a PR, set of tests cases and reasoning behind better scheduling, you are absolutely welcome. |
Beta Was this translation helpful? Give feedback.
-
Perhaps this can help you a bit. A minimal diff, that should work for you and passes existing tests in Basic idea is that if we fill up a pool, then run another query iteration. I do not think this needs extra parameter but this definetly needs some benchmarking on your case and on "regular" cases. Feel free to use it if it looks like it could work. index 1889285899..8094401219 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -336,7 +336,7 @@ class SchedulerJob(BaseJob):
task_filter = tuple_in_condition((TaskInstance.dag_id, TaskInstance.task_id), starved_tasks)
query = query.filter(not_(task_filter))
- query = query.limit(max_tis)
+ query = query.limit(max_tis - len(executable_tis))
task_instances_to_examine: List[TI] = with_row_locks(
query,
@@ -477,8 +477,10 @@ class SchedulerJob(BaseJob):
task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
pool_stats["open"] = open_slots
+ if open_slots <= 0:
+ starved_pools.add(pool_name)
- is_done = executable_tis or len(task_instances_to_examine) < max_tis
+ is_done = len(task_instances_to_examine) < max_tis
# Check this to avoid accidental infinite loops
found_new_filters = (
len(starved_pools) > num_starved_pools I'm 100% sure that there are some bugs here :) . |
Beta Was this translation helpful? Give feedback.
-
Not sure if this is a bug or an improvement and wanted to check my understanding in regards to the mechanism for finding task instances ready for execution. Relevant code is in this function:
airflow/airflow/jobs/scheduler_job.py
Line 249 in 3dbca5e
Imagine the following setup with two pools:
parallelism=100
big_pool (90 slots)
runs ~1k hourly tasks that each take a few seconds atpriority_weight=2
. It is often near capacity.small_pool (10 slots)
runs just a few short tasks each hour atpriority_weight=1
. It is often empty with all slots free.What I expect to happen
big_pool
only has 90 slots, andsmall_pool
has 10 slots, and theparallelism = 100
, we should always be able to schedule tasks insmall_pool
as long as it has available slots.What actually happens
big_pool
is churning through lots of tasks, it is rarely if ever actually starving. Often times there's just a few open slots (let's say it's 1 open slot for this example). This means that often timesmax_tis
is ~11 becausesmall_pool(10)
is generally free andbig_pool
has just 1 slot.big_pool
because it is not starving, and limits tomax_tis=11
.big_pool
is higher priority, all 11 tasks resulting from the task query are forbig_pool
, and so we schedule just 1 newbig_pool
tasks, when we could be scheduling 10small_pool
tasks also.big_pool
has high throughput, on the next scheduler loop, it's likely we are back in this situation again.How this could be improved?
big_pool
. On the second loop, it would excludebig_pool
and find it can run 10 TIs insmall_pool
.max_executable_ti_queries
.Happy to talk details or to be corrected on any of this. Please note that it's not really possible to provide a minimal reproducible example here because it requires a large-ish cluster setup.
Thanks!
Beta Was this translation helpful? Give feedback.
All reactions