From 6e536ba761284d200a55f47736dfd8fea955cfef Mon Sep 17 00:00:00 2001 From: Nicholas Hansen Date: Wed, 18 Jun 2025 17:20:33 +0100 Subject: [PATCH 1/5] Fix bookmarks of real resources --- kubernetes_asyncio/watch/watch.py | 18 +- kubernetes_asyncio/watch/watch_test.py | 352 ++++++++++++++----------- 2 files changed, 212 insertions(+), 158 deletions(-) diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index ab09f2e3..16ce2ff0 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -102,15 +102,15 @@ def unmarshal_event(self, data: str, response_type): reason = "{}: {}".format(obj['reason'], obj['message']) raise client.exceptions.ApiException(status=obj['code'], reason=reason) - # If possible, compile the JSON response into a Python native response - # type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ... - if response_type: - js['object'] = self._api_client.deserialize( - response=SimpleNamespace(data=json.dumps(js['raw_object'])), - response_type=response_type - ) - if js['type'].lower() != 'bookmark': + # If possible, compile the JSON response into a Python native response + # type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ... + if response_type: + js['object'] = self._api_client.deserialize( + response=SimpleNamespace(data=json.dumps(js['raw_object'])), + response_type=response_type + ) + # decode and save resource_version to continue watching if hasattr(js['object'], 'metadata'): self.resource_version = js['object'].metadata.resource_version @@ -123,7 +123,7 @@ def unmarshal_event(self, data: str, response_type): self.resource_version = js['object']['metadata']['resourceVersion'] elif js['type'].lower() == 'bookmark': - self.resource_version = js['object']['metadata']['resourceVersion'] + self.resource_version = js['raw_object']['metadata']['resourceVersion'] return js diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index 8b513793..c991beba 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -22,7 +22,6 @@ class WatchTest(IsolatedAsyncioTestCase): - async def test_watch_with_decode(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() @@ -31,30 +30,35 @@ async def test_watch_with_decode(self): { "type": "ADDED", "object": { - "metadata": {"name": "test{}".format(uid), - "resourceVersion": str(uid)}, - "spec": {}, "status": {} - } + "metadata": { + "name": "test{}".format(uid), + "resourceVersion": str(uid), + }, + "spec": {}, + "status": {}, + }, } for uid in range(3) ] - side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] - side_effects.extend([AssertionError('Should not have been called')]) + side_effects = [json.dumps(_).encode("utf8") for _ in side_effects] + side_effects.extend([AssertionError("Should not have been called")]) fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" watch = kubernetes_asyncio.watch.Watch() count = 0 async with watch: - async for e in watch.stream(fake_api.get_namespaces, resource_version='123'): - self.assertEqual("ADDED", e['type']) + async for e in watch.stream( + fake_api.get_namespaces, resource_version="123" + ): + self.assertEqual("ADDED", e["type"]) # make sure decoder worked and we got a model with the right name - self.assertEqual("test%d" % count, e['object'].metadata.name) + self.assertEqual("test%d" % count, e["object"].metadata.name) # make sure decoder worked and updated Watch.resource_version - self.assertEqual(e['object'].metadata.resource_version, str(count)) + self.assertEqual(e["object"].metadata.resource_version, str(count)) self.assertEqual(watch.resource_version, str(count)) # Stop the watch. This must not return the next event which would @@ -64,24 +68,27 @@ async def test_watch_with_decode(self): watch.stop() fake_api.get_namespaces.assert_called_once_with( - _preload_content=False, watch=True, resource_version='123') + _preload_content=False, watch=True, resource_version="123" + ) fake_resp.release.assert_called_once_with() # last resource_version has to be stored in the object - self.assertEqual(watch.resource_version, '2') + self.assertEqual(watch.resource_version, "2") async def test_watch_for_follow(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - side_effects = ['log_line_1', 'log_line_2', ''] - side_effects = [_.encode('utf8') for _ in side_effects] - side_effects.extend([AssertionError('Should not have been called')]) + side_effects = ["log_line_1", "log_line_2", ""] + side_effects = [_.encode("utf8") for _ in side_effects] + side_effects.extend([AssertionError("Should not have been called")]) fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.read_namespaced_pod_log = AsyncMock(return_value=fake_resp) - fake_api.read_namespaced_pod_log.__doc__ = ':param follow:\n:type follow: bool\n:rtype: str' + fake_api.read_namespaced_pod_log.__doc__ = ( + ":param follow:\n:type follow: bool\n:rtype: str" + ) watch = kubernetes_asyncio.watch.Watch() logs = [] @@ -89,9 +96,10 @@ async def test_watch_for_follow(self): async for e in watch.stream(fake_api.read_namespaced_pod_log): logs.append(e) - self.assertListEqual(logs, ['log_line_1', 'log_line_2']) + self.assertListEqual(logs, ["log_line_1", "log_line_2"]) fake_api.read_namespaced_pod_log.assert_called_once_with( - _preload_content=False, follow=True) + _preload_content=False, follow=True + ) fake_resp.release.assert_called_once_with() async def test_watch_k8s_empty_response(self): @@ -105,49 +113,53 @@ async def test_watch_k8s_empty_response(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() side_effects = [ - {"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}}, - {"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}}, + { + "type": "ADDED", + "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}, + }, + { + "type": "ADDED", + "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}, + }, ] - side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] - fake_resp.content.readline.side_effect = side_effects + [b''] + side_effects = [json.dumps(_).encode("utf8") for _ in side_effects] + fake_resp.content.readline.side_effect = side_effects + [b""] # Fake the K8s resource object to watch. fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" # Iteration must cease after all valid responses were received. watch = kubernetes_asyncio.watch.Watch() cnt = 0 - async for _ in watch.stream(fake_api.get_namespaces): # noqa + async for _ in watch.stream(fake_api.get_namespaces): # noqa cnt += 1 self.assertEqual(cnt, len(side_effects)) async def test_unmarshal_with_float_object(self): w = Watch() - event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float') - self.assertEqual("ADDED", event['type']) - self.assertEqual(1.0, event['object']) - self.assertTrue(isinstance(event['object'], float)) - self.assertEqual(1, event['raw_object']) + event = w.unmarshal_event('{"type": "ADDED", "object": 1}', "float") + self.assertEqual("ADDED", event["type"]) + self.assertEqual(1.0, event["object"]) + self.assertTrue(isinstance(event["object"], float)) + self.assertEqual(1, event["raw_object"]) async def test_unmarshal_without_return_type(self): w = Watch() - event = w.unmarshal_event( - '{"type": "ADDED", "object": ["test1"]}', None) - self.assertEqual("ADDED", event['type']) - self.assertEqual(["test1"], event['object']) - self.assertEqual(["test1"], event['raw_object']) + event = w.unmarshal_event('{"type": "ADDED", "object": ["test1"]}', None) + self.assertEqual("ADDED", event["type"]) + self.assertEqual(["test1"], event["object"]) + self.assertEqual(["test1"], event["raw_object"]) async def test_unmarshal_with_empty_return_type(self): # empty string as a return_type is a default value # if watch can't detect object by function's name w = Watch() - event = w.unmarshal_event( - '{"type": "ADDED", "object": ["test1"]}', '') - self.assertEqual("ADDED", event['type']) - self.assertEqual(["test1"], event['object']) - self.assertEqual(["test1"], event['raw_object']) + event = w.unmarshal_event('{"type": "ADDED", "object": ["test1"]}', "") + self.assertEqual("ADDED", event["type"]) + self.assertEqual(["test1"], event["object"]) + self.assertEqual(["test1"], event["raw_object"]) async def test_unmarshall_k8s_error_response(self): """Never parse messages of type ERROR. @@ -158,21 +170,22 @@ async def test_unmarshall_k8s_error_response(self): """ # An actual error response sent by K8s during testing. k8s_err = { - 'type': 'ERROR', - 'object': { - 'kind': 'Status', - 'apiVersion': 'v1', - 'metadata': {}, - 'status': 'Failure', - 'message': 'too old resource version: 1 (8146471)', - 'reason': 'Gone', - 'code': 410 - } + "type": "ERROR", + "object": { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "too old resource version: 1 (8146471)", + "reason": "Gone", + "code": 410, + }, } with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): + kubernetes_asyncio.client.exceptions.ApiException, + r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", + ): Watch().unmarshal_event(json.dumps(k8s_err), None) async def test_unmarshall_k8s_error_response_401_gke(self): @@ -183,31 +196,34 @@ async def test_unmarshall_k8s_error_response_401_gke(self): """ # An actual error response sent by K8s during testing. k8s_err = { - 'kind': 'Status', - 'apiVersion': 'v1', - 'metadata': {}, - 'status': 'Failure', - 'message': 'Unauthorized', - 'reason': 'Unauthorized', - 'code': 401 + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "Unauthorized", + "reason": "Unauthorized", + "code": 401, } with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r'\(401\)\nReason: Unauthorized: Unauthorized'): + kubernetes_asyncio.client.exceptions.ApiException, + r"\(401\)\nReason: Unauthorized: Unauthorized", + ): Watch().unmarshal_event(json.dumps(k8s_err), None) async def test_unmarshal_with_custom_object(self): w = Watch() - event = w.unmarshal_event('{"type": "ADDED", "object": {"apiVersion":' - '"test.com/v1beta1","kind":"foo","metadata":' - '{"name": "bar", "resourceVersion": "1"}}}', - 'object') - self.assertEqual("ADDED", event['type']) + event = w.unmarshal_event( + '{"type": "ADDED", "object": {"apiVersion":' + '"test.com/v1beta1","kind":"foo","metadata":' + '{"name": "bar", "resourceVersion": "1"}}}', + "object", + ) + self.assertEqual("ADDED", event["type"]) # make sure decoder deserialized json into dictionary and updated # Watch.resource_version - self.assertTrue(isinstance(event['object'], dict)) - self.assertEqual("1", event['object']['metadata']['resourceVersion']) + self.assertTrue(isinstance(event["object"], dict)) + self.assertEqual("1", event["object"]["metadata"]["resourceVersion"]) self.assertEqual("1", w.resource_version) async def test_watch_with_exception(self): @@ -216,11 +232,11 @@ async def test_watch_with_exception(self): fake_resp.content.readline.side_effect = KeyError("expected") fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" with self.assertRaises(KeyError): watch = kubernetes_asyncio.watch.Watch() - async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa + async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa pass async def test_watch_retry_timeout(self): @@ -228,28 +244,36 @@ async def test_watch_retry_timeout(self): fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - mock_event = {"type": "ADDED", - "object": {"metadata": {"name": "test1555", - "resourceVersion": "1555"}, - "spec": {}, - "status": {}}} + mock_event = { + "type": "ADDED", + "object": { + "metadata": {"name": "test1555", "resourceVersion": "1555"}, + "spec": {}, + "status": {}, + }, + } - fake_resp.content.readline.side_effect = [json.dumps(mock_event).encode('utf8'), - asyncio.TimeoutError(), - b""] + fake_resp.content.readline.side_effect = [ + json.dumps(mock_event).encode("utf8"), + asyncio.TimeoutError(), + b"", + ] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass fake_api.get_namespaces.assert_has_calls( - [call(_preload_content=False, watch=True), - call(_preload_content=False, watch=True, resource_version='1555')]) + [ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True, resource_version="1555"), + ] + ) fake_resp.release.assert_called_once_with() async def test_watch_retry_410(self): @@ -260,98 +284,103 @@ async def test_watch_retry_410(self): mock_event1 = { "type": "ADDED", "object": { - "metadata": - { - "name": "test1555", - "resourceVersion": "1555" - }, + "metadata": {"name": "test1555", "resourceVersion": "1555"}, "spec": {}, - "status": {} - } + "status": {}, + }, } mock_event2 = { "type": "ADDED", "object": { - "metadata": - { - "name": "test1555", - "resourceVersion": "1555" - }, + "metadata": {"name": "test1555", "resourceVersion": "1555"}, "spec": {}, - "status": {} - } + "status": {}, + }, } mock_410 = { - 'type': 'ERROR', - 'object': { - 'kind': 'Status', - 'apiVersion': 'v1', - 'metadata': {}, - 'status': 'Failure', - 'message': 'too old resource version: 1 (8146471)', - 'reason': 'Gone', - 'code': 410 - } + "type": "ERROR", + "object": { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "too old resource version: 1 (8146471)", + "reason": "Gone", + "code": 410, + }, } # retry 410 - fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - json.dumps(mock_event2).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - b""] + fake_resp.content.readline.side_effect = [ + json.dumps(mock_event1).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + json.dumps(mock_event2).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + b"", + ] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass fake_api.get_namespaces.assert_has_calls( - [call(_preload_content=False, watch=True), - call(_preload_content=False, watch=True, resource_version='1555')]) + [ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True, resource_version="1555"), + ] + ) fake_resp.release.assert_called_once_with() # retry 410 only once - fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - json.dumps(mock_event2).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - b""] + fake_resp.content.readline.side_effect = [ + json.dumps(mock_event1).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + json.dumps(mock_event2).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + b"", + ] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): + kubernetes_asyncio.client.exceptions.ApiException, + r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", + ): watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass # no retry 410 if timeout is passed - fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), - json.dumps(mock_410).encode('utf8'), - b""] + fake_resp.content.readline.side_effect = [ + json.dumps(mock_event1).encode("utf8"), + json.dumps(mock_410).encode("utf8"), + b"", + ] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): + kubernetes_asyncio.client.exceptions.ApiException, + r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", + ): watch = kubernetes_asyncio.watch.Watch() - async with watch.stream(fake_api.get_namespaces, timeout_seconds=10) as stream: - async for e in stream: # noqa + async with watch.stream( + fake_api.get_namespaces, timeout_seconds=10 + ) as stream: + async for e in stream: # noqa pass async def test_watch_timeout_with_resource_version(self): @@ -359,36 +388,61 @@ async def test_watch_timeout_with_resource_version(self): fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - fake_resp.content.readline.side_effect = [asyncio.TimeoutError(), - b""] + fake_resp.content.readline.side_effect = [asyncio.TimeoutError(), b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' + fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" watch = kubernetes_asyncio.watch.Watch() - async with watch.stream(fake_api.get_namespaces, resource_version='10') as stream: - async for e in stream: # noqa + async with watch.stream( + fake_api.get_namespaces, resource_version="10" + ) as stream: + async for e in stream: # noqa pass # all calls use the passed resource version fake_api.get_namespaces.assert_has_calls( - [call(_preload_content=False, watch=True, resource_version='10'), - call(_preload_content=False, watch=True, resource_version='10')]) + [ + call(_preload_content=False, watch=True, resource_version="10"), + call(_preload_content=False, watch=True, resource_version="10"), + ] + ) fake_resp.release.assert_called_once_with() - self.assertEqual(watch.resource_version, '10') + self.assertEqual(watch.resource_version, "10") async def test_unmarshal_bookmark_succeeds_and_preserves_resource_version(self): w = Watch() - event = w.unmarshal_event('{"type": "BOOKMARK", "object": {"apiVersion":' - '"test.com/v1beta1","kind":"foo","metadata":' - '{"name": "bar", "resourceVersion": "1"}}}', - 'object') - self.assertEqual("BOOKMARK", event['type']) + event = w.unmarshal_event( + '{"type": "BOOKMARK", "object": {"apiVersion":' + '"test.com/v1beta1","kind":"foo","metadata":' + '{"name": "bar", "resourceVersion": "1"}}}', + "object", + ) + self.assertEqual("BOOKMARK", event["type"]) + + # make sure the resource version is preserved, + # and the watcher's resource_version is updated + self.assertTrue(isinstance(event["raw_object"], dict)) + self.assertEqual("1", event["raw_object"]["metadata"]["resourceVersion"]) + self.assertEqual("1", w.resource_version) + + async def test_unmarshal_job_bookmark_succeeds_and_preserves_resource_version(self): + w = Watch() + event = w.unmarshal_event( + '{"type": "BOOKMARK", "object": {"apiVersion":' + '"batch/v1","kind":"Job","metadata":' + '{"name": "bar", "resourceVersion": "1"},' + '"spec": {"template": {"metadata": ' + '{"creationTimestamp":null}, "spec": ' + '{"containers":null}}}}}', + "object", + ) + self.assertEqual("BOOKMARK", event["type"]) # make sure the resource version is preserved, # and the watcher's resource_version is updated - self.assertTrue(isinstance(event['object'], dict)) - self.assertEqual("1", event['object']['metadata']['resourceVersion']) + self.assertTrue(isinstance(event["raw_object"], dict)) + self.assertEqual("1", event["raw_object"]["metadata"]["resourceVersion"]) self.assertEqual("1", w.resource_version) From e1e77a949c7a00129258d7a85594e8d462618682 Mon Sep 17 00:00:00 2001 From: Nicholas Hansen Date: Wed, 18 Jun 2025 17:21:23 +0100 Subject: [PATCH 2/5] Undo automated formatting --- kubernetes_asyncio/watch/watch_test.py | 355 +++++++++++-------------- 1 file changed, 159 insertions(+), 196 deletions(-) diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index c991beba..1a040901 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -22,6 +22,7 @@ class WatchTest(IsolatedAsyncioTestCase): + async def test_watch_with_decode(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() @@ -30,35 +31,30 @@ async def test_watch_with_decode(self): { "type": "ADDED", "object": { - "metadata": { - "name": "test{}".format(uid), - "resourceVersion": str(uid), - }, - "spec": {}, - "status": {}, - }, + "metadata": {"name": "test{}".format(uid), + "resourceVersion": str(uid)}, + "spec": {}, "status": {} + } } for uid in range(3) ] - side_effects = [json.dumps(_).encode("utf8") for _ in side_effects] - side_effects.extend([AssertionError("Should not have been called")]) + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + side_effects.extend([AssertionError('Should not have been called')]) fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() count = 0 async with watch: - async for e in watch.stream( - fake_api.get_namespaces, resource_version="123" - ): - self.assertEqual("ADDED", e["type"]) + async for e in watch.stream(fake_api.get_namespaces, resource_version='123'): + self.assertEqual("ADDED", e['type']) # make sure decoder worked and we got a model with the right name - self.assertEqual("test%d" % count, e["object"].metadata.name) + self.assertEqual("test%d" % count, e['object'].metadata.name) # make sure decoder worked and updated Watch.resource_version - self.assertEqual(e["object"].metadata.resource_version, str(count)) + self.assertEqual(e['object'].metadata.resource_version, str(count)) self.assertEqual(watch.resource_version, str(count)) # Stop the watch. This must not return the next event which would @@ -68,27 +64,24 @@ async def test_watch_with_decode(self): watch.stop() fake_api.get_namespaces.assert_called_once_with( - _preload_content=False, watch=True, resource_version="123" - ) + _preload_content=False, watch=True, resource_version='123') fake_resp.release.assert_called_once_with() # last resource_version has to be stored in the object - self.assertEqual(watch.resource_version, "2") + self.assertEqual(watch.resource_version, '2') async def test_watch_for_follow(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - side_effects = ["log_line_1", "log_line_2", ""] - side_effects = [_.encode("utf8") for _ in side_effects] - side_effects.extend([AssertionError("Should not have been called")]) + side_effects = ['log_line_1', 'log_line_2', ''] + side_effects = [_.encode('utf8') for _ in side_effects] + side_effects.extend([AssertionError('Should not have been called')]) fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.read_namespaced_pod_log = AsyncMock(return_value=fake_resp) - fake_api.read_namespaced_pod_log.__doc__ = ( - ":param follow:\n:type follow: bool\n:rtype: str" - ) + fake_api.read_namespaced_pod_log.__doc__ = ':param follow:\n:type follow: bool\n:rtype: str' watch = kubernetes_asyncio.watch.Watch() logs = [] @@ -96,10 +89,9 @@ async def test_watch_for_follow(self): async for e in watch.stream(fake_api.read_namespaced_pod_log): logs.append(e) - self.assertListEqual(logs, ["log_line_1", "log_line_2"]) + self.assertListEqual(logs, ['log_line_1', 'log_line_2']) fake_api.read_namespaced_pod_log.assert_called_once_with( - _preload_content=False, follow=True - ) + _preload_content=False, follow=True) fake_resp.release.assert_called_once_with() async def test_watch_k8s_empty_response(self): @@ -113,53 +105,49 @@ async def test_watch_k8s_empty_response(self): fake_resp = AsyncMock() fake_resp.content.readline = AsyncMock() side_effects = [ - { - "type": "ADDED", - "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}, - }, - { - "type": "ADDED", - "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}, - }, + {"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}}, + {"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}}, ] - side_effects = [json.dumps(_).encode("utf8") for _ in side_effects] - fake_resp.content.readline.side_effect = side_effects + [b""] + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + fake_resp.content.readline.side_effect = side_effects + [b''] # Fake the K8s resource object to watch. fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' # Iteration must cease after all valid responses were received. watch = kubernetes_asyncio.watch.Watch() cnt = 0 - async for _ in watch.stream(fake_api.get_namespaces): # noqa + async for _ in watch.stream(fake_api.get_namespaces): # noqa cnt += 1 self.assertEqual(cnt, len(side_effects)) async def test_unmarshal_with_float_object(self): w = Watch() - event = w.unmarshal_event('{"type": "ADDED", "object": 1}', "float") - self.assertEqual("ADDED", event["type"]) - self.assertEqual(1.0, event["object"]) - self.assertTrue(isinstance(event["object"], float)) - self.assertEqual(1, event["raw_object"]) + event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float') + self.assertEqual("ADDED", event['type']) + self.assertEqual(1.0, event['object']) + self.assertTrue(isinstance(event['object'], float)) + self.assertEqual(1, event['raw_object']) async def test_unmarshal_without_return_type(self): w = Watch() - event = w.unmarshal_event('{"type": "ADDED", "object": ["test1"]}', None) - self.assertEqual("ADDED", event["type"]) - self.assertEqual(["test1"], event["object"]) - self.assertEqual(["test1"], event["raw_object"]) + event = w.unmarshal_event( + '{"type": "ADDED", "object": ["test1"]}', None) + self.assertEqual("ADDED", event['type']) + self.assertEqual(["test1"], event['object']) + self.assertEqual(["test1"], event['raw_object']) async def test_unmarshal_with_empty_return_type(self): # empty string as a return_type is a default value # if watch can't detect object by function's name w = Watch() - event = w.unmarshal_event('{"type": "ADDED", "object": ["test1"]}', "") - self.assertEqual("ADDED", event["type"]) - self.assertEqual(["test1"], event["object"]) - self.assertEqual(["test1"], event["raw_object"]) + event = w.unmarshal_event( + '{"type": "ADDED", "object": ["test1"]}', '') + self.assertEqual("ADDED", event['type']) + self.assertEqual(["test1"], event['object']) + self.assertEqual(["test1"], event['raw_object']) async def test_unmarshall_k8s_error_response(self): """Never parse messages of type ERROR. @@ -170,22 +158,21 @@ async def test_unmarshall_k8s_error_response(self): """ # An actual error response sent by K8s during testing. k8s_err = { - "type": "ERROR", - "object": { - "kind": "Status", - "apiVersion": "v1", - "metadata": {}, - "status": "Failure", - "message": "too old resource version: 1 (8146471)", - "reason": "Gone", - "code": 410, - }, + 'type': 'ERROR', + 'object': { + 'kind': 'Status', + 'apiVersion': 'v1', + 'metadata': {}, + 'status': 'Failure', + 'message': 'too old resource version: 1 (8146471)', + 'reason': 'Gone', + 'code': 410 + } } with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", - ): + kubernetes_asyncio.client.exceptions.ApiException, + r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): Watch().unmarshal_event(json.dumps(k8s_err), None) async def test_unmarshall_k8s_error_response_401_gke(self): @@ -196,34 +183,31 @@ async def test_unmarshall_k8s_error_response_401_gke(self): """ # An actual error response sent by K8s during testing. k8s_err = { - "kind": "Status", - "apiVersion": "v1", - "metadata": {}, - "status": "Failure", - "message": "Unauthorized", - "reason": "Unauthorized", - "code": 401, + 'kind': 'Status', + 'apiVersion': 'v1', + 'metadata': {}, + 'status': 'Failure', + 'message': 'Unauthorized', + 'reason': 'Unauthorized', + 'code': 401 } with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r"\(401\)\nReason: Unauthorized: Unauthorized", - ): + kubernetes_asyncio.client.exceptions.ApiException, + r'\(401\)\nReason: Unauthorized: Unauthorized'): Watch().unmarshal_event(json.dumps(k8s_err), None) async def test_unmarshal_with_custom_object(self): w = Watch() - event = w.unmarshal_event( - '{"type": "ADDED", "object": {"apiVersion":' - '"test.com/v1beta1","kind":"foo","metadata":' - '{"name": "bar", "resourceVersion": "1"}}}', - "object", - ) - self.assertEqual("ADDED", event["type"]) + event = w.unmarshal_event('{"type": "ADDED", "object": {"apiVersion":' + '"test.com/v1beta1","kind":"foo","metadata":' + '{"name": "bar", "resourceVersion": "1"}}}', + 'object') + self.assertEqual("ADDED", event['type']) # make sure decoder deserialized json into dictionary and updated # Watch.resource_version - self.assertTrue(isinstance(event["object"], dict)) - self.assertEqual("1", event["object"]["metadata"]["resourceVersion"]) + self.assertTrue(isinstance(event['object'], dict)) + self.assertEqual("1", event['object']['metadata']['resourceVersion']) self.assertEqual("1", w.resource_version) async def test_watch_with_exception(self): @@ -232,11 +216,11 @@ async def test_watch_with_exception(self): fake_resp.content.readline.side_effect = KeyError("expected") fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' with self.assertRaises(KeyError): watch = kubernetes_asyncio.watch.Watch() - async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa + async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa pass async def test_watch_retry_timeout(self): @@ -244,36 +228,28 @@ async def test_watch_retry_timeout(self): fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - mock_event = { - "type": "ADDED", - "object": { - "metadata": {"name": "test1555", "resourceVersion": "1555"}, - "spec": {}, - "status": {}, - }, - } + mock_event = {"type": "ADDED", + "object": {"metadata": {"name": "test1555", + "resourceVersion": "1555"}, + "spec": {}, + "status": {}}} - fake_resp.content.readline.side_effect = [ - json.dumps(mock_event).encode("utf8"), - asyncio.TimeoutError(), - b"", - ] + fake_resp.content.readline.side_effect = [json.dumps(mock_event).encode('utf8'), + asyncio.TimeoutError(), + b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass fake_api.get_namespaces.assert_has_calls( - [ - call(_preload_content=False, watch=True), - call(_preload_content=False, watch=True, resource_version="1555"), - ] - ) + [call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True, resource_version='1555')]) fake_resp.release.assert_called_once_with() async def test_watch_retry_410(self): @@ -284,103 +260,98 @@ async def test_watch_retry_410(self): mock_event1 = { "type": "ADDED", "object": { - "metadata": {"name": "test1555", "resourceVersion": "1555"}, + "metadata": + { + "name": "test1555", + "resourceVersion": "1555" + }, "spec": {}, - "status": {}, - }, + "status": {} + } } mock_event2 = { "type": "ADDED", "object": { - "metadata": {"name": "test1555", "resourceVersion": "1555"}, + "metadata": + { + "name": "test1555", + "resourceVersion": "1555" + }, "spec": {}, - "status": {}, - }, + "status": {} + } } mock_410 = { - "type": "ERROR", - "object": { - "kind": "Status", - "apiVersion": "v1", - "metadata": {}, - "status": "Failure", - "message": "too old resource version: 1 (8146471)", - "reason": "Gone", - "code": 410, - }, + 'type': 'ERROR', + 'object': { + 'kind': 'Status', + 'apiVersion': 'v1', + 'metadata': {}, + 'status': 'Failure', + 'message': 'too old resource version: 1 (8146471)', + 'reason': 'Gone', + 'code': 410 + } } # retry 410 - fake_resp.content.readline.side_effect = [ - json.dumps(mock_event1).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - json.dumps(mock_event2).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - b"", - ] + fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + json.dumps(mock_event2).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass fake_api.get_namespaces.assert_has_calls( - [ - call(_preload_content=False, watch=True), - call(_preload_content=False, watch=True, resource_version="1555"), - ] - ) + [call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True, resource_version='1555')]) fake_resp.release.assert_called_once_with() # retry 410 only once - fake_resp.content.readline.side_effect = [ - json.dumps(mock_event1).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - json.dumps(mock_event2).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - b"", - ] + fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + json.dumps(mock_event2).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", - ): + kubernetes_asyncio.client.exceptions.ApiException, + r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): watch = kubernetes_asyncio.watch.Watch() async with watch.stream(fake_api.get_namespaces) as stream: - async for e in stream: # noqa + async for e in stream: # noqa pass # no retry 410 if timeout is passed - fake_resp.content.readline.side_effect = [ - json.dumps(mock_event1).encode("utf8"), - json.dumps(mock_410).encode("utf8"), - b"", - ] + fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'), + json.dumps(mock_410).encode('utf8'), + b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' with self.assertRaisesRegex( - kubernetes_asyncio.client.exceptions.ApiException, - r"\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)", - ): + kubernetes_asyncio.client.exceptions.ApiException, + r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'): watch = kubernetes_asyncio.watch.Watch() - async with watch.stream( - fake_api.get_namespaces, timeout_seconds=10 - ) as stream: - async for e in stream: # noqa + async with watch.stream(fake_api.get_namespaces, timeout_seconds=10) as stream: + async for e in stream: # noqa pass async def test_watch_timeout_with_resource_version(self): @@ -388,61 +359,53 @@ async def test_watch_timeout_with_resource_version(self): fake_resp.content.readline = AsyncMock() fake_resp.release = Mock() - fake_resp.content.readline.side_effect = [asyncio.TimeoutError(), b""] + fake_resp.content.readline.side_effect = [asyncio.TimeoutError(), + b""] fake_api = Mock() fake_api.get_namespaces = AsyncMock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ":rtype: V1NamespaceList" + fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() - async with watch.stream( - fake_api.get_namespaces, resource_version="10" - ) as stream: - async for e in stream: # noqa + async with watch.stream(fake_api.get_namespaces, resource_version='10') as stream: + async for e in stream: # noqa pass # all calls use the passed resource version fake_api.get_namespaces.assert_has_calls( - [ - call(_preload_content=False, watch=True, resource_version="10"), - call(_preload_content=False, watch=True, resource_version="10"), - ] - ) + [call(_preload_content=False, watch=True, resource_version='10'), + call(_preload_content=False, watch=True, resource_version='10')]) fake_resp.release.assert_called_once_with() - self.assertEqual(watch.resource_version, "10") + self.assertEqual(watch.resource_version, '10') async def test_unmarshal_bookmark_succeeds_and_preserves_resource_version(self): w = Watch() - event = w.unmarshal_event( - '{"type": "BOOKMARK", "object": {"apiVersion":' - '"test.com/v1beta1","kind":"foo","metadata":' - '{"name": "bar", "resourceVersion": "1"}}}', - "object", - ) - self.assertEqual("BOOKMARK", event["type"]) + event = w.unmarshal_event('{"type": "BOOKMARK", "object": {"apiVersion":' + '"test.com/v1beta1","kind":"foo","metadata":' + '{"name": "bar", "resourceVersion": "1"}}}', + 'object') + self.assertEqual("BOOKMARK", event['type']) # make sure the resource version is preserved, # and the watcher's resource_version is updated - self.assertTrue(isinstance(event["raw_object"], dict)) - self.assertEqual("1", event["raw_object"]["metadata"]["resourceVersion"]) + self.assertTrue(isinstance(event['raw_object'], dict)) + self.assertEqual("1", event['raw_object']['metadata']['resourceVersion']) self.assertEqual("1", w.resource_version) async def test_unmarshal_job_bookmark_succeeds_and_preserves_resource_version(self): w = Watch() - event = w.unmarshal_event( - '{"type": "BOOKMARK", "object": {"apiVersion":' - '"batch/v1","kind":"Job","metadata":' - '{"name": "bar", "resourceVersion": "1"},' - '"spec": {"template": {"metadata": ' - '{"creationTimestamp":null}, "spec": ' - '{"containers":null}}}}}', - "object", - ) - self.assertEqual("BOOKMARK", event["type"]) + event = w.unmarshal_event('{"type": "BOOKMARK", "object": {"apiVersion":' + '"batch/v1","kind":"Job","metadata":' + '{"name": "bar", "resourceVersion": "1"},' + '"spec": {"template": {"metadata": ' + '{"creationTimestamp":null}, "spec": ' + '{"containers":null}}}}}', + 'object') + self.assertEqual("BOOKMARK", event['type']) # make sure the resource version is preserved, # and the watcher's resource_version is updated - self.assertTrue(isinstance(event["raw_object"], dict)) - self.assertEqual("1", event["raw_object"]["metadata"]["resourceVersion"]) + self.assertTrue(isinstance(event['raw_object'], dict)) + self.assertEqual("1", event['raw_object']['metadata']['resourceVersion']) self.assertEqual("1", w.resource_version) From 796834b52142ad8fcf697ffedaff070e63b2ad72 Mon Sep 17 00:00:00 2001 From: Nicholas Hansen Date: Mon, 14 Jul 2025 11:23:48 +0100 Subject: [PATCH 3/5] Check for metadata and resourceVersion before trying to extract --- kubernetes_asyncio/watch/watch.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index 16ce2ff0..413a9378 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -123,7 +123,10 @@ def unmarshal_event(self, data: str, response_type): self.resource_version = js['object']['metadata']['resourceVersion'] elif js['type'].lower() == 'bookmark': - self.resource_version = js['raw_object']['metadata']['resourceVersion'] + if (isinstance(js['raw_object'], dict) + and 'metadata' in js['raw_object'] + and 'resourceVersion' in js['raw_object']['metadata']): + self.resource_version = js['raw_object']['metadata']['resourceVersion'] return js From 6793fc5f9a06e9e18ed93415da3d0a4c67ef87c8 Mon Sep 17 00:00:00 2001 From: Nicholas Hansen Date: Mon, 14 Jul 2025 11:38:26 +0100 Subject: [PATCH 4/5] Added test for malformed bookmark --- kubernetes_asyncio/watch/watch.py | 4 ++++ kubernetes_asyncio/watch/watch_test.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index 413a9378..05dcadb9 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -127,6 +127,10 @@ def unmarshal_event(self, data: str, response_type): and 'metadata' in js['raw_object'] and 'resourceVersion' in js['raw_object']['metadata']): self.resource_version = js['raw_object']['metadata']['resourceVersion'] + else: + raise Exception(("Malformed JSON response for bookmark event, " + "'metadata' or 'resourceVersion' field is missing. " + "JSON: {}").format(js)) return js diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index 1a040901..7c5aac7d 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -409,3 +409,17 @@ async def test_unmarshal_job_bookmark_succeeds_and_preserves_resource_version(se self.assertTrue(isinstance(event['raw_object'], dict)) self.assertEqual("1", event['raw_object']['metadata']['resourceVersion']) self.assertEqual("1", w.resource_version) + + async def test_unmarshall_job_bookmark_malformed_object_fails(self): + # An actual error response sent by K8s during testing. + k8s_err = { + 'type': 'BOOKMARK', + 'object': { + 'kind': 'Job', + 'apiVersion': 'batch/v1', + 'metadata': {}, + } + } + + with self.assertRaises(Exception): + Watch().unmarshal_event(json.dumps(k8s_err), None) From e1934fe0b07421bc6a901d9d9bb42af7515f3eae Mon Sep 17 00:00:00 2001 From: Nicholas Hansen Date: Mon, 14 Jul 2025 11:43:37 +0100 Subject: [PATCH 5/5] Fixed flake8 complaint --- kubernetes_asyncio/watch/watch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index 05dcadb9..f8b26a33 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -129,8 +129,8 @@ def unmarshal_event(self, data: str, response_type): self.resource_version = js['raw_object']['metadata']['resourceVersion'] else: raise Exception(("Malformed JSON response for bookmark event, " - "'metadata' or 'resourceVersion' field is missing. " - "JSON: {}").format(js)) + "'metadata' or 'resourceVersion' field is missing. " + "JSON: {}").format(js)) return js