Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesPodOperator never stops if credentials are refreshed #42361

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
PodNotFoundException,
PodOperatorHookProtocol,
PodPhase,
check_exception_is_kubernetes_api_unauthorized,
container_is_succeeded,
get_container_termination_message,
)
Expand Down Expand Up @@ -113,6 +112,10 @@ class PodReattachFailure(AirflowException):
"""When we expect to be able to find a pod but cannot."""


class PodCredentialsExpiredFailure(AirflowException):
"""When pod fails to refresh credentials."""


class KubernetesPodOperator(BaseOperator):
"""
Execute a task in a Kubernetes Pod.
Expand Down Expand Up @@ -652,9 +655,8 @@ def execute_sync(self, context: Context):
return result

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(max=15),
retry=tenacity.retry_if_exception(lambda exc: check_exception_is_kubernetes_api_unauthorized(exc)),
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
reraise=True,
)
def await_pod_completion(self, pod: k8s.V1Pod):
Expand All @@ -675,6 +677,10 @@ def await_pod_completion(self, pod: k8s.V1Pod):
"Failed to check container status due to permission error. Refreshing credentials and retrying."
)
self._refresh_cached_properties()
self.pod_manager.read_pod(
pod=pod
) # attempt using refreshed credentials, raises if still invalid
raise PodCredentialsExpiredFailure("Kubernetes credentials expired, retrying after refresh.")
raise exc

def _refresh_cached_properties(self):
Expand Down
29 changes: 26 additions & 3 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,8 +1629,9 @@ def test_execute_async_callbacks(self):
@pytest.mark.parametrize("get_logs", [True, False])
@patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs")
@patch(f"{POD_MANAGER_CLASS}.await_container_completion")
@patch(f"{POD_MANAGER_CLASS}.read_pod")
def test_await_container_completion_refreshes_properties_on_exception(
self, mock_await_container_completion, fetch_requested_container_logs, get_logs
self, mock_read_pod, mock_await_container_completion, fetch_requested_container_logs, get_logs
):
k = KubernetesPodOperator(task_id="task", get_logs=get_logs)
pod = self.run_pod(k)
Expand All @@ -1655,6 +1656,28 @@ def test_await_container_completion_refreshes_properties_on_exception(
mock_await_container_completion.assert_has_calls(
[mock.call(pod=pod, container_name=k.base_container_name)] * 3
)
mock_read_pod.assert_called()
assert client != k.client
assert hook != k.hook
assert pod_manager != k.pod_manager

@patch(f"{POD_MANAGER_CLASS}.await_container_completion")
@patch(f"{POD_MANAGER_CLASS}.read_pod")
def test_await_container_completion_raises_unauthorized_if_credentials_still_invalid_after_refresh(
self, mock_read_pod, mock_await_container_completion
):
k = KubernetesPodOperator(task_id="task", get_logs=False)
pod = self.run_pod(k)
client, hook, pod_manager = k.client, k.hook, k.pod_manager

mock_await_container_completion.side_effect = [ApiException(status=401)]
mock_read_pod.side_effect = [ApiException(status=401)]

with pytest.raises(ApiException):
k.await_pod_completion(pod)

mock_read_pod.assert_called()
# assert cache was refreshed
assert client != k.client
assert hook != k.hook
assert pod_manager != k.pod_manager
Expand All @@ -1663,7 +1686,7 @@ def test_await_container_completion_refreshes_properties_on_exception(
"side_effect, exception_type, expect_exc",
[
([ApiException(401), mock.DEFAULT], ApiException, True), # works after one 401
([ApiException(401)] * 10, ApiException, False), # exc after 3 retries on 401
([ApiException(401)] * 3 + [mock.DEFAULT], ApiException, True), # works after 3 retries
([ApiException(402)], ApiException, False), # exc on non-401
([ApiException(500)], ApiException, False), # exc on non-401
([Exception], Exception, False), # exc on different exception
Expand All @@ -1684,7 +1707,7 @@ def test_await_container_completion_retries_on_specific_exception(
else:
with pytest.raises(exception_type):
k.await_pod_completion(pod)
expected_call_count = min(len(side_effect), 3) # retry max 3 times
expected_call_count = len(side_effect)
mock_await_container_completion.assert_has_calls(
[mock.call(pod=pod, container_name=k.base_container_name)] * expected_call_count
)
Expand Down