Skip to content

Commit 770ec7d

Browse files
committed
Fix: Improve PubSub resilience and exception handling
This commit introduces several improvements to the Pub/Sub system to make it more resilient to connection issues and to handle exceptions more gracefully. The following changes are included: - A `health_check_interval` is added to the Redis connection to keep it alive and prevent timeouts. - Reconnection logic is implemented in the `listen` method. If the connection to Redis is lost, the client will now attempt to reconnect and re-subscribe to the channel automatically. - The exception handling in the `listen` method is made more specific, catching `ConnectionError` for reconnection and other `RedisError` exceptions for graceful exit. These changes improve the overall stability and robustness of the Pub/Sub system, especially in environments where the Redis server might be restarted or the network connection is not perfectly stable. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 14ddd7a commit 770ec7d

File tree

1 file changed

+24
-4
lines changed

1 file changed

+24
-4
lines changed

api/pubsub.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
"""Pub/Sub implementation"""
77

8+
import logging
89
import asyncio
910

1011
import json
@@ -14,6 +15,8 @@
1415
from .models import Subscription, SubscriptionStats
1516
from .config import PubSubSettings
1617

18+
logger = logging.getLogger(__name__)
19+
1720

1821
class PubSub:
1922
"""Pub/Sub implementation class
@@ -39,7 +42,9 @@ def __init__(self, host=None, db_number=None):
3942
host = self._settings.redis_host
4043
if db_number is None:
4144
db_number = self._settings.redis_db_number
42-
self._redis = aioredis.from_url(f'redis://{host}/{db_number}')
45+
self._redis = aioredis.from_url(
46+
f'redis://{host}/{db_number}', health_check_interval=30
47+
)
4348
# self._subscriptions is a dict that matches a subscription id
4449
# (key) with a Subscription object ('sub') and a redis
4550
# PubSub object ('redis_sub'). For instance:
@@ -135,9 +140,24 @@ async def listen(self, sub_id, user=None):
135140
f"not owned by {user}")
136141
while True:
137142
self._subscriptions[sub_id]['last_poll'] = datetime.utcnow()
138-
msg = await sub['redis_sub'].get_message(
139-
ignore_subscribe_messages=True, timeout=1.0
140-
)
143+
msg = None
144+
try:
145+
msg = await sub['redis_sub'].get_message(
146+
ignore_subscribe_messages=True, timeout=1.0
147+
)
148+
except aioredis.ConnectionError:
149+
async with self._lock:
150+
channel = self._subscriptions[sub_id]['sub'].channel
151+
new_redis_sub = self._redis.pubsub()
152+
await new_redis_sub.subscribe(channel)
153+
self._subscriptions[sub_id]['redis_sub'] = new_redis_sub
154+
sub['redis_sub'] = new_redis_sub
155+
continue
156+
except aioredis.RedisError as exc:
157+
# log the error and continue
158+
logger.error("Redis error occurred: %s", exc)
159+
return None # Handle any exceptions gracefully
160+
141161
if msg is None:
142162
continue
143163
msg_data = json.loads(msg['data'])

0 commit comments

Comments
 (0)