Skip to content

Commit f4d446a

Browse files
niklasr22ambika-garg
authored andcommitted
Update DockerSwarmOperator auto_remove to align with DockerOperator (apache#45745)
* Update DockerSwarmOperator auto_remove to align with DockerOperator * add docker swarm auto remove test
1 parent 0501cac commit f4d446a

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

providers/docker/src/airflow/providers/docker/operators/docker_swarm.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ class DockerSwarmOperator(DockerOperator):
5959
If image tag is omitted, "latest" will be used.
6060
:param api_version: Remote API version. Set to ``auto`` to automatically
6161
detect the server's version.
62-
:param auto_remove: Auto-removal of the container on daemon side when the
63-
container's process exits.
64-
The default is False.
62+
:param auto_remove: Enable removal of the service when the service has terminated. Possible values:
63+
64+
- ``never``: (default) do not remove service
65+
- ``success``: remove on success
66+
- ``force``: always remove service
6567
:param command: Command to be run in the container. (templated)
6668
:param args: Arguments to the command.
6769
:param docker_url: URL of the host running the docker daemon.
@@ -214,18 +216,16 @@ def _run_service(self) -> None:
214216
container_id = task["Status"]["ContainerStatus"]["ContainerID"]
215217
container = self.cli.inspect_container(container_id)
216218
self.containers.append(container)
217-
else:
218-
raise AirflowException(f"Service did not complete: {self.service!r}")
219219

220220
if self.retrieve_output:
221221
return self._attempt_to_retrieve_results()
222222

223-
self.log.info("auto_removeauto_removeauto_removeauto_removeauto_remove : %s", str(self.auto_remove))
223+
self.log.info("auto_remove: %s", str(self.auto_remove))
224224
if self.service and self._service_status() != "complete":
225-
if self.auto_remove == "success":
225+
if self.auto_remove == "force":
226226
self.cli.remove_service(self.service["ID"])
227227
raise AirflowException(f"Service did not complete: {self.service!r}")
228-
elif self.auto_remove == "success":
228+
elif self.auto_remove in ["success", "force"]:
229229
if not self.service:
230230
raise RuntimeError("The 'service' should be initialized before!")
231231
self.cli.remove_service(self.service["ID"])

providers/docker/tests/provider_tests/docker/operators/test_docker_swarm.py

+36-2
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ def _client_service_logs_effect():
130130
client_mock.remove_service.assert_called_once_with("some_id")
131131

132132
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
133-
def test_auto_remove(self, types_mock, docker_api_client_patcher):
133+
@pytest.mark.parametrize("auto_remove", ["success", "force"])
134+
def test_auto_remove(self, types_mock, docker_api_client_patcher, auto_remove):
134135
mock_obj = mock.Mock()
135136

136137
client_mock = mock.Mock(spec=APIClient)
@@ -148,12 +149,45 @@ def test_auto_remove(self, types_mock, docker_api_client_patcher):
148149
docker_api_client_patcher.return_value = client_mock
149150

150151
operator = DockerSwarmOperator(
151-
image="", auto_remove="success", task_id="unittest", enable_logging=False
152+
image="", auto_remove=auto_remove, task_id="unittest", enable_logging=False
152153
)
153154
operator.execute(None)
154155

155156
client_mock.remove_service.assert_called_once_with("some_id")
156157

158+
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
159+
@pytest.mark.parametrize(
160+
"auto_remove,expected_remove_call", [("success", False), ("force", True), ("never", False)]
161+
)
162+
def test_auto_remove_failed(
163+
self, types_mock, docker_api_client_patcher, auto_remove, expected_remove_call
164+
):
165+
mock_obj = mock.Mock()
166+
167+
client_mock = mock.Mock(spec=APIClient)
168+
client_mock.create_service.return_value = {"ID": "some_id"}
169+
client_mock.images.return_value = []
170+
client_mock.pull.return_value = [b'{"status":"pull log"}']
171+
client_mock.tasks.return_value = [
172+
{"Status": {"State": "failed", "ContainerStatus": {"ContainerID": "some_id"}}}
173+
]
174+
types_mock.TaskTemplate.return_value = mock_obj
175+
types_mock.ContainerSpec.return_value = mock_obj
176+
types_mock.RestartPolicy.return_value = mock_obj
177+
types_mock.Resources.return_value = mock_obj
178+
179+
docker_api_client_patcher.return_value = client_mock
180+
181+
operator = DockerSwarmOperator(
182+
image="", auto_remove=auto_remove, task_id="unittest", enable_logging=False
183+
)
184+
try:
185+
operator.execute(None)
186+
except AirflowException:
187+
pass
188+
189+
assert (client_mock.remove_service.call_count > 0) == expected_remove_call
190+
157191
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
158192
def test_no_auto_remove(self, types_mock, docker_api_client_patcher):
159193
mock_obj = mock.Mock()

0 commit comments

Comments
 (0)