1
1
import asyncio
2
2
import random
3
3
import weakref
4
- from typing import AsyncIterator , Iterable , Mapping , Optional , Sequence , Tuple , Type
4
+ from typing import (
5
+ AsyncIterator ,
6
+ Iterable ,
7
+ Mapping ,
8
+ Optional ,
9
+ Sequence ,
10
+ Tuple ,
11
+ Type ,
12
+ Union ,
13
+ )
5
14
6
15
from redis .asyncio .client import Redis
7
16
from redis .asyncio .connection import (
17
+ BlockingConnectionPool ,
8
18
Connection ,
9
19
ConnectionPool ,
10
20
EncodableT ,
@@ -203,12 +213,38 @@ async def get_master_address(self):
203
213
def rotate_slaves (self ) -> AsyncIterator :
204
214
"""Round-robin slave balancer"""
205
215
return self .proxy .rotate_slaves ()
216
+
217
+
218
+ class SentinelBlockingConnectionPool (BlockingConnectionPool ):
219
+ """
220
+ Sentinel blocking connection pool.
221
+
222
+ If ``check_connection`` flag is set to True, SentinelManagedConnection
223
+ sends a PING command right after establishing the connection.
224
+ """
225
+
226
+ def __init__ (self , service_name , sentinel_manager , ** kwargs ):
227
+ kwargs ["connection_class" ] = kwargs .get (
228
+ "connection_class" ,
229
+ (
230
+ SentinelManagedSSLConnection
231
+ if kwargs .pop ("ssl" , False )
232
+ else SentinelManagedConnection
233
+ ),
234
+ )
235
+ self .is_master = kwargs .pop ("is_master" , True )
236
+ self .check_connection = kwargs .pop ("check_connection" , False )
237
+ self .proxy = SentinelConnectionPoolProxy (
238
+ connection_pool = self ,
239
+ is_master = self .is_master ,
240
+ check_connection = self .check_connection ,
241
+ service_name = service_name ,
242
+ sentinel_manager = sentinel_manager ,
243
+ )
206
244
super ().__init__ (** kwargs )
207
- self .connection_kwargs ["connection_pool" ] = weakref .proxy ( self )
245
+ self .connection_kwargs ["connection_pool" ] = self .proxy
208
246
self .service_name = service_name
209
247
self .sentinel_manager = sentinel_manager
210
- self .master_address = None
211
- self .slave_rr_counter = None
212
248
213
249
def __repr__ (self ):
214
250
return (
@@ -218,8 +254,11 @@ def __repr__(self):
218
254
219
255
def reset (self ):
220
256
super ().reset ()
221
- self .master_address = None
222
- self .slave_rr_counter = None
257
+ self .proxy .reset ()
258
+
259
+ @property
260
+ def master_address (self ):
261
+ return self .proxy .master_address
223
262
224
263
def owns_connection (self , connection : Connection ):
225
264
check = not self .is_master or (
@@ -228,31 +267,11 @@ def owns_connection(self, connection: Connection):
228
267
return check and super ().owns_connection (connection )
229
268
230
269
async def get_master_address (self ):
231
- master_address = await self .sentinel_manager .discover_master (self .service_name )
232
- if self .is_master :
233
- if self .master_address != master_address :
234
- self .master_address = master_address
235
- # disconnect any idle connections so that they reconnect
236
- # to the new master the next time that they are used.
237
- await self .disconnect (inuse_connections = False )
238
- return master_address
270
+ return await self .proxy .get_master_address ()
239
271
240
- async def rotate_slaves (self ) -> AsyncIterator :
272
+ def rotate_slaves (self ) -> AsyncIterator :
241
273
"""Round-robin slave balancer"""
242
- slaves = await self .sentinel_manager .discover_slaves (self .service_name )
243
- if slaves :
244
- if self .slave_rr_counter is None :
245
- self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
246
- for _ in range (len (slaves )):
247
- self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
248
- slave = slaves [self .slave_rr_counter ]
249
- yield slave
250
- # Fallback to the master connection
251
- try :
252
- yield await self .get_master_address ()
253
- except MasterNotFoundError :
254
- pass
255
- raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
274
+ return self .proxy .rotate_slaves ()
256
275
257
276
258
277
class Sentinel (AsyncSentinelCommands ):
@@ -405,7 +424,10 @@ def master_for(
405
424
self ,
406
425
service_name : str ,
407
426
redis_class : Type [Redis ] = Redis ,
408
- connection_pool_class : Type [SentinelConnectionPool ] = SentinelConnectionPool ,
427
+ connection_pool_class : Union [
428
+ Type [SentinelConnectionPool ],
429
+ Type [SentinelBlockingConnectionPool ],
430
+ ] = SentinelConnectionPool ,
409
431
** kwargs ,
410
432
):
411
433
"""
@@ -442,7 +464,10 @@ def slave_for(
442
464
self ,
443
465
service_name : str ,
444
466
redis_class : Type [Redis ] = Redis ,
445
- connection_pool_class : Type [SentinelConnectionPool ] = SentinelConnectionPool ,
467
+ connection_pool_class : Union [
468
+ Type [SentinelConnectionPool ],
469
+ Type [SentinelBlockingConnectionPool ],
470
+ ] = SentinelConnectionPool ,
446
471
** kwargs ,
447
472
):
448
473
"""
0 commit comments