From 3092737318b20a0295adf87f3297dc4d683e59cc Mon Sep 17 00:00:00 2001 From: Guodong Date: Wed, 25 Sep 2024 18:11:29 +0800 Subject: [PATCH] rm redundant errors --- nacos/client.py | 59 ++++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/nacos/client.py b/nacos/client.py index e5248b9..7161be7 100644 --- a/nacos/client.py +++ b/nacos/client.py @@ -813,35 +813,38 @@ def _init_pulling(self): logger.info("[init-pulling] init completed") def _process_polling_result(self): - while True: - cache_key, content, md5 = self.notify_queue.get() - logger.info("[process-polling-result] receive an event:%s" % cache_key) - wl = self.watcher_mapping.get(cache_key) - if not wl: - logger.warning("[process-polling-result] no watcher on %s, ignored" % cache_key) - continue + try: + while True: + cache_key, content, md5 = self.notify_queue.get() + logger.info("[process-polling-result] receive an event:%s" % cache_key) + wl = self.watcher_mapping.get(cache_key) + if not wl: + logger.warning("[process-polling-result] no watcher on %s, ignored" % cache_key) + continue - data_id, group, namespace = parse_key(cache_key) - plain_content = content - - params = { - "data_id": data_id, - "group": group, - "namespace": namespace, - "raw_content": content, - "content": plain_content, - } - for watcher in wl: - if not watcher.last_md5 == md5: - logger.info( - "[process-polling-result] md5 changed since last call, calling %s with changed md5: %s ,params: %s" - % (watcher.callback.__name__, md5, params)) - try: - self.callback_tread_pool.apply(watcher.callback, (params,)) - except Exception as e: - logger.exception("[process-polling-result] exception %s occur while calling %s " % ( - str(e), watcher.callback.__name__)) - watcher.last_md5 = md5 + data_id, group, namespace = parse_key(cache_key) + plain_content = content + + params = { + "data_id": data_id, + "group": group, + "namespace": namespace, + "raw_content": content, + "content": plain_content, + } + for watcher in wl: + if not watcher.last_md5 == md5: + logger.info( + "[process-polling-result] md5 changed since last call, calling %s with changed md5: %s ,params: %s" + % (watcher.callback.__name__, md5, params)) + try: + self.callback_tread_pool.apply(watcher.callback, (params,)) + except Exception as e: + logger.exception("[process-polling-result] exception %s occur while calling %s " % ( + str(e), watcher.callback.__name__)) + watcher.last_md5 = md5 + except (EOFError, OSError) as e: + logger.exception(f"[process-polling-result] break when receive exception of {type(e)}") @staticmethod def _inject_version_info(headers):