Skip to content

Commit a745633

Browse files
Prab-27vincbeck
andauthored
Fix MyPy type errors in providers in cncf/kubernetes provider (#57563)
* Remove SQLA 1 limit in Fab provider * fix mypy errors in cncf-kubernetes-provider * resolved conflicts * use filter_for_tis as variable * remove cast and use new variable --------- Co-authored-by: vincbeck <[email protected]>
1 parent 45678e8 commit a745633

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,11 @@ def _change_state(
505505
if state is None:
506506
from airflow.models.taskinstance import TaskInstance
507507

508-
state = session.scalar(select(TaskInstance.state).where(TaskInstance.filter_for_tis([key])))
508+
filter_for_tis = TaskInstance.filter_for_tis([key])
509+
if filter_for_tis is not None:
510+
state = session.scalar(select(TaskInstance.state).where(filter_for_tis))
511+
else:
512+
state = None
509513
state = TaskInstanceState(state) if state else None
510514

511515
self.event_buffer[key] = state, None
@@ -515,7 +519,8 @@ def _get_pod_namespace(ti: TaskInstance):
515519
pod_override = ti.executor_config.get("pod_override")
516520
namespace = None
517521
with suppress(Exception):
518-
namespace = pod_override.metadata.namespace
522+
if pod_override is not None:
523+
namespace = pod_override.metadata.namespace
519524
return namespace or conf.get("kubernetes_executor", "namespace")
520525

521526
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
@@ -569,7 +574,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
569574
tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
570575
kube_client: client.CoreV1Api = self.kube_client
571576
for scheduler_job_id in scheduler_job_ids:
572-
scheduler_job_id = self._make_safe_label_value(str(scheduler_job_id))
577+
scheduler_job_id_safe_label = self._make_safe_label_value(str(scheduler_job_id))
573578
# We will look for any pods owned by the no-longer-running scheduler,
574579
# but will exclude only successful pods, as those TIs will have a terminal state
575580
# and not be up for adoption!
@@ -579,7 +584,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
579584
"field_selector": "status.phase!=Succeeded",
580585
"label_selector": (
581586
"kubernetes_executor=True,"
582-
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
587+
f"airflow-worker={scheduler_job_id_safe_label},{POD_EXECUTOR_DONE_KEY}!=True"
583588
),
584589
}
585590
pod_list = self._list_pods(query_kwargs)

0 commit comments

Comments
 (0)