From e7b55335db2124d8755d905b28eb4466a6ed7811 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 30 Oct 2025 23:39:43 +0000 Subject: [PATCH] Fix executor field not populated for tasks without explicit executor When a task doesn't specify an executor, the executor field in the database remained `NULL`, causing it to not display in the UI. This fix resolves the executor to the default configured executor name at task instance creation and refresh time, following the same pattern as other fields like pool and queue. The fix modifies TaskInstance.insert_mapping() and TaskInstance.refresh_from_task() to automatically populate the executor field with the default executor when task.executor is None, ensuring the field always displays correctly in the UI. closes #57526 --- .../src/airflow/models/taskinstance.py | 14 +++- .../ui/src/pages/TaskInstance/Details.tsx | 4 ++ .../tests/unit/models/test_taskinstance.py | 66 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 0a080ca4ca731..662b4266f83a4 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -69,6 +69,7 @@ from airflow._shared.timezones import timezone from airflow.assets.manager import asset_manager from airflow.configuration import conf +from airflow.executors.executor_loader import ExecutorLoader from airflow.listeners.listener import get_listener_manager from airflow.models.asset import AssetEvent, AssetModel from airflow.models.base import Base, StringID, TaskInstanceDependencies @@ -556,6 +557,11 @@ def insert_mapping( TaskInstance(task=task, run_id=run_id, map_index=map_index, dag_version_id=dag_version_id) ) + executor = task.executor + if executor is None: + executor_name = ExecutorLoader.get_default_executor_name() + executor = executor_name.alias or executor_name.module_path + return { "dag_id": task.dag_id, "task_id": task.task_id, @@ -569,7 +575,7 @@ def insert_mapping( "priority_weight": priority_weight, "run_as_user": task.run_as_user, "max_tries": task.retries, - "executor": task.executor, + "executor": executor, "executor_config": task.executor_config, "operator": task.task_type, "custom_operator_name": getattr(task, "operator_name", None), @@ -753,7 +759,11 @@ def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> self.run_as_user = task.run_as_user # Do not set max_tries to task.retries here because max_tries is a cumulative # value that needs to be stored in the db. - self.executor = task.executor + if task.executor is None: + executor_name = ExecutorLoader.get_default_executor_name() + self.executor = executor_name.alias or executor_name.module_path + else: + self.executor = task.executor self.executor_config = task.executor_config self.operator = task.task_type op_name = getattr(task, "operator_name", None) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Details.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Details.tsx index a769498f6d05e..1038fc140e04d 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Details.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Details.tsx @@ -207,6 +207,10 @@ export const Details = () => { {translate("taskInstance.executor")} + {tryInstance?.executor} + + + {translate("taskInstance.executorConfig")} {tryInstance?.executor_config} diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3a166e1433061..a2de9185d1e7f 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -40,6 +40,7 @@ AirflowFailException, AirflowSkipException, ) +from airflow.executors.executor_utils import ExecutorName from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel from airflow.models.connection import Connection from airflow.models.dag_version import DagVersion @@ -2821,6 +2822,71 @@ def mock_policy(task_instance: TaskInstance): assert ti.max_tries == expected_max_tries +@pytest.mark.parametrize( + "task_executor,expected_executor", + [ + (None, "LocalExecutor"), # Default executor should be resolved + ("LocalExecutor", "LocalExecutor"), # Explicit executor should be preserved + ("CeleryExecutor", "CeleryExecutor"), # Explicit executor should be preserved + ], +) +def test_refresh_from_task_resolves_executor(task_executor, expected_executor, monkeypatch): + """Test that refresh_from_task resolves None executor to default executor name.""" + # Mock the default executor + mock_executor_name = ExecutorName( + module_path="airflow.executors.local_executor.LocalExecutor", alias="LocalExecutor" + ) + + with mock.patch( + "airflow.executors.executor_loader.ExecutorLoader.get_default_executor_name", + return_value=mock_executor_name, + ): + task = EmptyOperator(task_id="test_executor", executor=task_executor) + ti = TI(task, run_id=None, dag_version_id=mock.MagicMock()) + ti.refresh_from_task(task) + + assert ti.executor == expected_executor + + +def test_insert_mapping_resolves_executor_to_default(): + """Test that insert_mapping resolves None executor to default executor name.""" + mock_executor_name = ExecutorName( + module_path="airflow.executors.local_executor.LocalExecutor", alias="LocalExecutor" + ) + + with mock.patch( + "airflow.executors.executor_loader.ExecutorLoader.get_default_executor_name", + return_value=mock_executor_name, + ): + task = EmptyOperator( + task_id="test_task", + executor=None, # No executor specified + ) + + mapping = TI.insert_mapping( + run_id="test_run", + task=task, + map_index=-1, + dag_version_id=mock.MagicMock(), + ) + + assert mapping["executor"] == "LocalExecutor" + + +def test_insert_mapping_preserves_explicit_executor(): + """Test that insert_mapping preserves explicitly set executor.""" + task = EmptyOperator(task_id="test_task", executor="CeleryExecutor") + + mapping = TI.insert_mapping( + run_id="test_run", + task=task, + map_index=-1, + dag_version_id=mock.MagicMock(), + ) + + assert mapping["executor"] == "CeleryExecutor" + + class TestRunRawTaskQueriesCount: """ These tests are designed to detect changes in the number of queries executed