Skip to content

Commit c3fe98c

Browse files
author
Nikita Bakhilin
committed
feat: Add health check to watch.stream for silent connection drops
- Add _health_check_interval parameter to watch.stream() - Detect and recover from silent connection drops during control plane upgrades - Preserve backward compatibility (disabled by default) - Add comprehensive tests for new functionality Fixes: #2462
1 parent 1d6c076 commit c3fe98c

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

kubernetes/base/watch/watch.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import time
1516
import json
1617
import pydoc
1718
import sys
@@ -180,10 +181,22 @@ def stream(self, func, *args, **kwargs):
180181
disable_retries = ('timeout_seconds' in kwargs)
181182
retry_after_410 = False
182183
deserialize = kwargs.pop('deserialize', True)
184+
185+
health_check_interval = kwargs.pop('_health_check_interval', 0) # 0 = disabled by default
186+
last_event_time = time.time() if health_check_interval > 0 else None
187+
183188
while True:
184189
resp = func(*args, **kwargs)
185190
try:
186191
for line in iter_resp_lines(resp):
192+
# Health check for silent connection drops
193+
if health_check_interval > 0 and last_event_time is not None:
194+
current_time = time.time()
195+
if current_time - last_event_time > health_check_interval:
196+
# Silent connection detected - break to reconnect
197+
break
198+
last_event_time = current_time
199+
187200
# unmarshal when we are receiving events from watch,
188201
# return raw string when we are streaming log
189202
if watch_arg == "watch":

kubernetes/base/watch/watch_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,36 @@ def test_pod_log_empty_lines(self):
576576
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
577577
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
578578

579+
def test_health_check_detects_silent_connection_drop(self):
580+
"""Test that health check detects when connection stops receiving events"""
581+
fake_resp = Mock()
582+
fake_resp.close = Mock()
583+
fake_resp.release_conn = Mock()
584+
585+
def limited_stalled_stream():
586+
yield '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}}}\n'
587+
for _ in range(10):
588+
yield ''
589+
return
590+
591+
fake_resp.stream = Mock(return_value=limited_stalled_stream())
592+
593+
fake_api = Mock()
594+
fake_api.get_namespaces = Mock(return_value=fake_resp)
595+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
596+
597+
w = Watch()
598+
events = []
599+
600+
try:
601+
for e in w.stream(fake_api.get_namespaces, _health_check_interval=0.1, timeout_seconds=1):
602+
events.append(e)
603+
except Exception:
604+
pass
605+
606+
self.assertEqual(1, len(events))
607+
self.assertEqual("test1", events[0]['object'].metadata.name)
608+
579609
# Comment out the test below, it does not work currently.
580610
# def test_watch_with_deserialize_param(self):
581611
# """test watch.stream() deserialize param"""

0 commit comments

Comments
 (0)