Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,11 @@ def _change_state(
if state is None:
from airflow.models.taskinstance import TaskInstance

state = session.scalar(select(TaskInstance.state).where(TaskInstance.filter_for_tis([key])))
filter_for_tis = TaskInstance.filter_for_tis([key])
if filter_for_tis is not None:
state = session.scalar(select(TaskInstance.state).where(filter_for_tis))
else:
state = None
state = TaskInstanceState(state) if state else None

self.event_buffer[key] = state, None
Expand All @@ -515,7 +519,8 @@ def _get_pod_namespace(ti: TaskInstance):
pod_override = ti.executor_config.get("pod_override")
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
if pod_override is not None:
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace")

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
Expand Down Expand Up @@ -569,7 +574,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id = self._make_safe_label_value(str(scheduler_job_id))
scheduler_job_id_safe_label = self._make_safe_label_value(str(scheduler_job_id))
# We will look for any pods owned by the no-longer-running scheduler,
# but will exclude only successful pods, as those TIs will have a terminal state
# and not be up for adoption!
Expand All @@ -579,7 +584,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
"field_selector": "status.phase!=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
f"airflow-worker={scheduler_job_id_safe_label},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
pod_list = self._list_pods(query_kwargs)
Expand Down