Skip to content

Commit c7be733

Browse files
committed
Fix executor field not populated for tasks without explicit executor
When a task doesn't specify an executor, the executor field in the database remained `NULL`, causing it to not display in the UI. This fix resolves the executor to the default configured executor name at task instance creation and refresh time, following the same pattern as other fields like pool and queue. The fix modifies TaskInstance.insert_mapping() and TaskInstance.refresh_from_task() to automatically populate the executor field with the default executor when task.executor is None, ensuring the field always displays correctly in the UI. closes #57526
1 parent ae2a4fd commit c7be733

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from airflow._shared.timezones import timezone
7070
from airflow.assets.manager import asset_manager
7171
from airflow.configuration import conf
72+
from airflow.executors.executor_loader import ExecutorLoader
7273
from airflow.listeners.listener import get_listener_manager
7374
from airflow.models.asset import AssetEvent, AssetModel
7475
from airflow.models.base import Base, StringID, TaskInstanceDependencies
@@ -556,6 +557,11 @@ def insert_mapping(
556557
TaskInstance(task=task, run_id=run_id, map_index=map_index, dag_version_id=dag_version_id)
557558
)
558559

560+
executor = task.executor
561+
if executor is None:
562+
executor_name = ExecutorLoader.get_default_executor_name()
563+
executor = executor_name.alias or executor_name.module_path
564+
559565
return {
560566
"dag_id": task.dag_id,
561567
"task_id": task.task_id,
@@ -569,7 +575,7 @@ def insert_mapping(
569575
"priority_weight": priority_weight,
570576
"run_as_user": task.run_as_user,
571577
"max_tries": task.retries,
572-
"executor": task.executor,
578+
"executor": executor,
573579
"executor_config": task.executor_config,
574580
"operator": task.task_type,
575581
"custom_operator_name": getattr(task, "operator_name", None),
@@ -753,7 +759,11 @@ def refresh_from_task(self, task: Operator, pool_override: str | None = None) ->
753759
self.run_as_user = task.run_as_user
754760
# Do not set max_tries to task.retries here because max_tries is a cumulative
755761
# value that needs to be stored in the db.
756-
self.executor = task.executor
762+
if task.executor is None:
763+
executor_name = ExecutorLoader.get_default_executor_name()
764+
self.executor = executor_name.alias or executor_name.module_path
765+
else:
766+
self.executor = task.executor
757767
self.executor_config = task.executor_config
758768
self.operator = task.task_type
759769
op_name = getattr(task, "operator_name", None)

airflow-core/src/airflow/ui/src/pages/TaskInstance/Details.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ export const Details = () => {
207207
</Table.Row>
208208
<Table.Row>
209209
<Table.Cell>{translate("taskInstance.executor")}</Table.Cell>
210+
<Table.Cell>{tryInstance?.executor}</Table.Cell>
211+
</Table.Row>
212+
<Table.Row>
213+
<Table.Cell>{translate("taskInstance.executorConfig")}</Table.Cell>
210214
<Table.Cell>{tryInstance?.executor_config}</Table.Cell>
211215
</Table.Row>
212216
</Table.Body>

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
AirflowFailException,
4141
AirflowSkipException,
4242
)
43+
from airflow.executors.executor_utils import ExecutorName
4344
from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel
4445
from airflow.models.connection import Connection
4546
from airflow.models.dag_version import DagVersion
@@ -2821,6 +2822,71 @@ def mock_policy(task_instance: TaskInstance):
28212822
assert ti.max_tries == expected_max_tries
28222823

28232824

2825+
@pytest.mark.parametrize(
2826+
"task_executor,expected_executor",
2827+
[
2828+
(None, "LocalExecutor"), # Default executor should be resolved
2829+
("LocalExecutor", "LocalExecutor"), # Explicit executor should be preserved
2830+
("CeleryExecutor", "CeleryExecutor"), # Explicit executor should be preserved
2831+
],
2832+
)
2833+
def test_refresh_from_task_resolves_executor(task_executor, expected_executor, monkeypatch):
2834+
"""Test that refresh_from_task resolves None executor to default executor name."""
2835+
# Mock the default executor
2836+
mock_executor_name = ExecutorName(
2837+
module_path="airflow.executors.local_executor.LocalExecutor", alias="LocalExecutor"
2838+
)
2839+
2840+
with mock.patch(
2841+
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor_name",
2842+
return_value=mock_executor_name,
2843+
):
2844+
task = EmptyOperator(task_id="test_executor", executor=task_executor)
2845+
ti = TI(task, run_id=None, dag_version_id=mock.MagicMock())
2846+
ti.refresh_from_task(task)
2847+
2848+
assert ti.executor == expected_executor
2849+
2850+
2851+
def test_insert_mapping_resolves_executor_to_default():
2852+
"""Test that insert_mapping resolves None executor to default executor name."""
2853+
mock_executor_name = ExecutorName(
2854+
module_path="airflow.executors.local_executor.LocalExecutor", alias="LocalExecutor"
2855+
)
2856+
2857+
with mock.patch(
2858+
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor_name",
2859+
return_value=mock_executor_name,
2860+
):
2861+
task = EmptyOperator(
2862+
task_id="test_task",
2863+
executor=None, # No executor specified
2864+
)
2865+
2866+
mapping = TI.insert_mapping(
2867+
run_id="test_run",
2868+
task=task,
2869+
map_index=-1,
2870+
dag_version_id=mock.MagicMock(),
2871+
)
2872+
2873+
assert mapping["executor"] == "LocalExecutor"
2874+
2875+
2876+
def test_insert_mapping_preserves_explicit_executor():
2877+
"""Test that insert_mapping preserves explicitly set executor."""
2878+
task = EmptyOperator(task_id="test_task", executor="CeleryExecutor")
2879+
2880+
mapping = TI.insert_mapping(
2881+
run_id="test_run",
2882+
task=task,
2883+
map_index=-1,
2884+
dag_version_id=mock.MagicMock(),
2885+
)
2886+
2887+
assert mapping["executor"] == "CeleryExecutor"
2888+
2889+
28242890
class TestRunRawTaskQueriesCount:
28252891
"""
28262892
These tests are designed to detect changes in the number of queries executed

0 commit comments

Comments
 (0)