Skip to content

Commit a7306d2

Browse files
jason810496ambika-garg
authored andcommitted
Fix FileTaskHandler only read from default executor (apache#45631)
* Fix FileTaskHandler only read from default executor * Add cached_property back to avoid loading executors * Add test for multi-executors scenario * Allow to call load_executor without init_executors * Refactor by caching necessary executors * Refactor test with default executor case * Fix side effect from executor_loader * Fix KubernetesExecutor test - Previous test failure is cuased by cache state of executor_instances - Should set ti.state = RUNNING after ti.run * Fix side effect from executor_loader - The side effect only show up in postgres as backend environment, as previous fix only resolve side effect in sqlite as backend environment. - Also refactor clean_executor_loader as pytest fixture with setup teardown * Capitalize default executor key * Refactor clean_executor_loader fixture
1 parent 26fe623 commit a7306d2

File tree

8 files changed

+223
-50
lines changed

8 files changed

+223
-50
lines changed

airflow/executors/executor_loader.py

+4
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ def init_executors(cls) -> list[BaseExecutor]:
231231
@classmethod
232232
def lookup_executor_name_by_str(cls, executor_name_str: str) -> ExecutorName:
233233
# lookup the executor by alias first, if not check if we're given a module path
234+
if not _classname_to_executors or not _module_to_executors or not _alias_to_executors:
235+
# if we haven't loaded the executors yet, such as directly calling load_executor
236+
cls._get_executor_names()
237+
234238
if executor_name := _alias_to_executors.get(executor_name_str):
235239
return executor_name
236240
elif executor_name := _module_to_executors.get(executor_name_str):

airflow/utils/log/file_task_handler.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from collections.abc import Iterable
2525
from contextlib import suppress
2626
from enum import Enum
27-
from functools import cached_property
2827
from pathlib import Path
2928
from typing import TYPE_CHECKING, Any, Callable
3029
from urllib.parse import urljoin
@@ -44,6 +43,7 @@
4443
if TYPE_CHECKING:
4544
from pendulum import DateTime
4645

46+
from airflow.executors.base_executor import BaseExecutor
4747
from airflow.models.taskinstance import TaskInstance
4848
from airflow.models.taskinstancekey import TaskInstanceKey
4949

@@ -179,6 +179,8 @@ class FileTaskHandler(logging.Handler):
179179
inherits_from_empty_operator_log_message = (
180180
"Operator inherits from empty operator and thus does not have logs"
181181
)
182+
executor_instances: dict[str, BaseExecutor] = {}
183+
DEFAULT_EXECUTOR_KEY = "_default_executor"
182184

183185
def __init__(
184186
self,
@@ -314,11 +316,27 @@ def _render_filename(self, ti: TaskInstance, try_number: int, session=NEW_SESSIO
314316
def _read_grouped_logs(self):
315317
return False
316318

317-
@cached_property
318-
def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
319-
"""This cached property avoids loading executor repeatedly."""
320-
executor = ExecutorLoader.get_default_executor()
321-
return executor.get_task_log
319+
def _get_executor_get_task_log(
320+
self, ti: TaskInstance
321+
) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
322+
"""
323+
Get the get_task_log method from executor of current task instance.
324+
325+
Since there might be multiple executors, so we need to get the executor of current task instance instead of getting from default executor.
326+
327+
:param ti: task instance object
328+
:return: get_task_log method of the executor
329+
"""
330+
executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY
331+
executor = self.executor_instances.get(executor_name)
332+
if executor is not None:
333+
return executor.get_task_log
334+
335+
if executor_name == self.DEFAULT_EXECUTOR_KEY:
336+
self.executor_instances[executor_name] = ExecutorLoader.get_default_executor()
337+
else:
338+
self.executor_instances[executor_name] = ExecutorLoader.load_executor(executor_name)
339+
return self.executor_instances[executor_name].get_task_log
322340

323341
def _read(
324342
self,
@@ -360,7 +378,8 @@ def _read(
360378
messages_list.extend(remote_messages)
361379
has_k8s_exec_pod = False
362380
if ti.state == TaskInstanceState.RUNNING:
363-
response = self._executor_get_task_log(ti, try_number)
381+
executor_get_task_log = self._get_executor_get_task_log(ti)
382+
response = executor_get_task_log(ti, try_number)
364383
if response:
365384
executor_messages, executor_logs = response
366385
if executor_messages:

providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def teardown_method(self):
7474
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
7575
)
7676
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
77+
@pytest.mark.usefixtures("clean_executor_loader")
7778
def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
7879
"""Test for k8s executor, the log is read from get_task_log method"""
7980
mock_k8s_get_task_log.return_value = ([], [])
@@ -86,6 +87,7 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
8687
)
8788
ti.state = state
8889
ti.triggerer_job = None
90+
ti.executor = executor_name
8991
with conf_vars({("core", "executor"): executor_name}):
9092
reload(executor_loader)
9193
fth = FileTaskHandler("")
@@ -105,11 +107,12 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
105107
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
106108
],
107109
)
108-
@patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
110+
@conf_vars({("core", "executor"): "KubernetesExecutor"})
109111
@patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
110112
def test_read_from_k8s_under_multi_namespace_mode(
111113
self, mock_kube_client, pod_override, namespace_to_call
112114
):
115+
reload(executor_loader)
113116
mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
114117
mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
115118

@@ -139,6 +142,7 @@ def task_callable(ti):
139142
)
140143
ti = TaskInstance(task=task, run_id=dagrun.run_id)
141144
ti.try_number = 3
145+
ti.executor = "KubernetesExecutor"
142146

143147
logger = ti.log
144148
ti.log.disabled = False
@@ -147,6 +151,8 @@ def task_callable(ti):
147151
set_context(logger, ti)
148152
ti.run(ignore_ti_state=True)
149153
ti.state = TaskInstanceState.RUNNING
154+
# clear executor_instances cache
155+
file_handler.executor_instances = {}
150156
file_handler.read(ti, 2)
151157

152158
# first we find pod name

tests/executors/test_executor_loader.py

+48-41
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from importlib import reload
2019
from unittest import mock
2120

2221
import pytest
2322

2423
from airflow.exceptions import AirflowConfigException
2524
from airflow.executors import executor_loader
26-
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, ExecutorName
25+
from airflow.executors.executor_loader import ConnectorSource, ExecutorName
2726
from airflow.executors.local_executor import LocalExecutor
2827
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import AwsEcsExecutor
2928
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
@@ -35,24 +34,12 @@ class FakeExecutor:
3534
pass
3635

3736

37+
@pytest.mark.usefixtures("clean_executor_loader")
3838
class TestExecutorLoader:
39-
def setup_method(self) -> None:
40-
from airflow.executors import executor_loader
41-
42-
reload(executor_loader)
43-
global ExecutorLoader
44-
ExecutorLoader = executor_loader.ExecutorLoader # type: ignore
45-
46-
def teardown_method(self) -> None:
47-
from airflow.executors import executor_loader
48-
49-
reload(executor_loader)
50-
ExecutorLoader.init_executors()
51-
5239
def test_no_executor_configured(self):
5340
with conf_vars({("core", "executor"): None}):
5441
with pytest.raises(AirflowConfigException, match=r".*not found in config$"):
55-
ExecutorLoader.get_default_executor()
42+
executor_loader.ExecutorLoader.get_default_executor()
5643

5744
@pytest.mark.parametrize(
5845
"executor_name",
@@ -66,16 +53,18 @@ def test_no_executor_configured(self):
6653
)
6754
def test_should_support_executor_from_core(self, executor_name):
6855
with conf_vars({("core", "executor"): executor_name}):
69-
executor = ExecutorLoader.get_default_executor()
56+
executor = executor_loader.ExecutorLoader.get_default_executor()
7057
assert executor is not None
7158
assert executor_name == executor.__class__.__name__
7259
assert executor.name is not None
73-
assert executor.name == ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
60+
assert executor.name == ExecutorName(
61+
executor_loader.ExecutorLoader.executors[executor_name], alias=executor_name
62+
)
7463
assert executor.name.connector_source == ConnectorSource.CORE
7564

7665
def test_should_support_custom_path(self):
7766
with conf_vars({("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"}):
78-
executor = ExecutorLoader.get_default_executor()
67+
executor = executor_loader.ExecutorLoader.get_default_executor()
7968
assert executor is not None
8069
assert executor.__class__.__name__ == "FakeExecutor"
8170
assert executor.name is not None
@@ -249,17 +238,17 @@ def test_get_hybrid_executors_from_config(
249238
"airflow.executors.executor_loader.ExecutorLoader._get_team_executor_configs",
250239
return_value=team_executor_config,
251240
):
252-
executors = ExecutorLoader._get_executor_names()
241+
executors = executor_loader.ExecutorLoader._get_executor_names()
253242
assert executors == expected_executors_list
254243

255244
def test_init_executors(self):
256245
with conf_vars({("core", "executor"): "CeleryExecutor"}):
257-
executors = ExecutorLoader.init_executors()
258-
executor_name = ExecutorLoader.get_default_executor_name()
246+
executors = executor_loader.ExecutorLoader.init_executors()
247+
executor_name = executor_loader.ExecutorLoader.get_default_executor_name()
259248
assert len(executors) == 1
260249
assert isinstance(executors[0], CeleryExecutor)
261-
assert "CeleryExecutor" in ExecutorLoader.executors
262-
assert ExecutorLoader.executors["CeleryExecutor"] == executor_name.module_path
250+
assert "CeleryExecutor" in executor_loader.ExecutorLoader.executors
251+
assert executor_loader.ExecutorLoader.executors["CeleryExecutor"] == executor_name.module_path
263252

264253
@pytest.mark.parametrize(
265254
"executor_config",
@@ -276,7 +265,7 @@ def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_
276265
with pytest.raises(
277266
AirflowConfigException, match=r".+Duplicate executors are not yet supported.+"
278267
):
279-
ExecutorLoader._get_executor_names()
268+
executor_loader.ExecutorLoader._get_executor_names()
280269

281270
@pytest.mark.parametrize(
282271
"executor_config",
@@ -292,7 +281,7 @@ def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_
292281
def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, executor_config):
293282
with conf_vars({("core", "executor"): executor_config}):
294283
with pytest.raises(AirflowConfigException):
295-
ExecutorLoader._get_executor_names()
284+
executor_loader.ExecutorLoader._get_executor_names()
296285

297286
@pytest.mark.parametrize(
298287
("executor_config", "expected_value"),
@@ -308,7 +297,7 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
308297
)
309298
def test_should_support_import_executor_from_core(self, executor_config, expected_value):
310299
with conf_vars({("core", "executor"): executor_config}):
311-
executor, import_source = ExecutorLoader.import_default_executor_cls()
300+
executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
312301
assert expected_value == executor.__name__
313302
assert import_source == ConnectorSource.CORE
314303

@@ -322,26 +311,43 @@ def test_should_support_import_executor_from_core(self, executor_config, expecte
322311
)
323312
def test_should_support_import_custom_path(self, executor_config):
324313
with conf_vars({("core", "executor"): executor_config}):
325-
executor, import_source = ExecutorLoader.import_default_executor_cls()
314+
executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
326315
assert executor.__name__ == "FakeExecutor"
327316
assert import_source == ConnectorSource.CUSTOM_PATH
328317

329318
def test_load_executor(self):
330319
with conf_vars({("core", "executor"): "LocalExecutor"}):
331-
ExecutorLoader.init_executors()
332-
assert isinstance(ExecutorLoader.load_executor("LocalExecutor"), LocalExecutor)
333-
assert isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor)
334-
assert isinstance(ExecutorLoader.load_executor(None), LocalExecutor)
320+
executor_loader.ExecutorLoader.init_executors()
321+
assert isinstance(executor_loader.ExecutorLoader.load_executor("LocalExecutor"), LocalExecutor)
322+
assert isinstance(
323+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
324+
LocalExecutor,
325+
)
326+
assert isinstance(executor_loader.ExecutorLoader.load_executor(None), LocalExecutor)
335327

336328
def test_load_executor_alias(self):
337329
with conf_vars({("core", "executor"): "local_exec:airflow.executors.local_executor.LocalExecutor"}):
338-
ExecutorLoader.init_executors()
339-
assert isinstance(ExecutorLoader.load_executor("local_exec"), LocalExecutor)
330+
executor_loader.ExecutorLoader.init_executors()
331+
assert isinstance(executor_loader.ExecutorLoader.load_executor("local_exec"), LocalExecutor)
340332
assert isinstance(
341-
ExecutorLoader.load_executor("airflow.executors.local_executor.LocalExecutor"),
333+
executor_loader.ExecutorLoader.load_executor(
334+
"airflow.executors.local_executor.LocalExecutor"
335+
),
336+
LocalExecutor,
337+
)
338+
assert isinstance(
339+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
342340
LocalExecutor,
343341
)
344-
assert isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), LocalExecutor)
342+
343+
@mock.patch(
344+
"airflow.executors.executor_loader.ExecutorLoader._get_executor_names",
345+
wraps=executor_loader.ExecutorLoader._get_executor_names,
346+
)
347+
def test_call_load_executor_method_without_init_executors(self, mock_get_executor_names):
348+
with conf_vars({("core", "executor"): "LocalExecutor"}):
349+
executor_loader.ExecutorLoader.load_executor("LocalExecutor")
350+
mock_get_executor_names.assert_called_once()
345351

346352
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor", autospec=True)
347353
def test_load_custom_executor_with_classname(self, mock_executor):
@@ -353,15 +359,16 @@ def test_load_custom_executor_with_classname(self, mock_executor):
353359
): "my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
354360
}
355361
):
356-
ExecutorLoader.init_executors()
357-
assert isinstance(ExecutorLoader.load_executor("my_alias"), AwsEcsExecutor)
358-
assert isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
362+
executor_loader.ExecutorLoader.init_executors()
363+
assert isinstance(executor_loader.ExecutorLoader.load_executor("my_alias"), AwsEcsExecutor)
364+
assert isinstance(executor_loader.ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
359365
assert isinstance(
360-
ExecutorLoader.load_executor(
366+
executor_loader.ExecutorLoader.load_executor(
361367
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
362368
),
363369
AwsEcsExecutor,
364370
)
365371
assert isinstance(
366-
ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
372+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
373+
AwsEcsExecutor,
367374
)

tests/ti_deps/deps/test_ready_to_reschedule_dep.py

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def side_effect(*args, **kwargs):
4949
yield m
5050

5151

52+
@pytest.mark.usefixtures("clean_executor_loader")
5253
class TestNotInReschedulePeriodDep:
5354
@pytest.fixture(autouse=True)
5455
def setup_test_cases(self, request, create_task_instance):

0 commit comments

Comments
 (0)