diff --git a/.github/wordlist.txt b/.github/wordlist.txt index ca2102b825..76cb929b33 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -44,6 +44,7 @@ RedisJSON RedisTimeSeries SHA SearchCommands +SentinelBlockingConnectionPool SentinelCommands SentinelConnectionPool Sharded diff --git a/CHANGES b/CHANGES index 8750128b05..a72eab801a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,5 @@ + * Add redis.asyncio.SentinelBlockingConnectionPool + * Use SentinelConnectionPoolProxy in asyncio.sentinel module * Move doctests (doc code examples) to main branch * Update `ResponseT` type hint * Allow to control the minimum SSL version diff --git a/docs/connections.rst b/docs/connections.rst index b547a7659e..098b441986 100644 --- a/docs/connections.rst +++ b/docs/connections.rst @@ -63,6 +63,24 @@ This client is used for communicating with Redis, asynchronously. :members: +Async Sentinel Client +********************* + +Sentinel (Async) +================ +.. autoclass:: redis.asyncio.sentinel.Sentinel + :members: + +SentinelConnectionPool (Async) +============================== +.. autoclass:: redis.asyncio.sentinel.SentinelConnectionPool + :members: + +SentinelBlockingConnectionPool (Async) +====================================== +.. autoclass:: redis.asyncio.sentinel.SentinelBlockingConnectionPool + :members: + Async Cluster Client ******************** diff --git a/docs/examples/asyncio_examples.ipynb b/docs/examples/asyncio_examples.ipynb index d2b11b56be..7013145879 100644 --- a/docs/examples/asyncio_examples.ipynb +++ b/docs/examples/asyncio_examples.ipynb @@ -328,6 +328,29 @@ "assert val == b\"value\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "from redis.asyncio.sentinel import Sentinel, SentinelBlockingConnectionPool\n", + "\n", + "\n", + "sentinel = Sentinel([(\"localhost\", 26379), (\"sentinel2\", 26379)])\n", + "r = sentinel.master_for(\n", + " \"mymaster\",\n", + " connection_pool_class=SentinelBlockingConnectionPool,\n", + " max_connections=5, # Max connections in pool\n", + " timeout=3, # Connection acquiring timeout in seconds. To turn off the timeout, set None\n", + ")\n", + "\n", + "# This code doesn't raise a MaxConectionsError exception\n", + "await asyncio.gather(*[r.get(\"key\") for _ in range(10)])" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 3545ab44c2..651472a8e3 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -9,6 +9,7 @@ ) from redis.asyncio.sentinel import ( Sentinel, + SentinelBlockingConnectionPool, SentinelConnectionPool, SentinelManagedConnection, SentinelManagedSSLConnection, @@ -53,6 +54,7 @@ "RedisError", "ResponseError", "Sentinel", + "SentinelBlockingConnectionPool", "SentinelConnectionPool", "SentinelManagedConnection", "SentinelManagedSSLConnection", diff --git a/redis/asyncio/sentinel.py b/redis/asyncio/sentinel.py index 6fd233adc8..407b28e06a 100644 --- a/redis/asyncio/sentinel.py +++ b/redis/asyncio/sentinel.py @@ -1,10 +1,20 @@ import asyncio import random import weakref -from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type +from typing import ( + AsyncIterator, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) from redis.asyncio.client import Redis from redis.asyncio.connection import ( + BlockingConnectionPool, Connection, ConnectionPool, EncodableT, @@ -97,6 +107,55 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): pass +class SentinelConnectionPoolProxy: + def __init__( + self, + connection_pool, + is_master, + check_connection, + service_name, + sentinel_manager, + ): + self.connection_pool_ref = weakref.ref(connection_pool) + self.is_master = is_master + self.check_connection = check_connection + self.service_name = service_name + self.sentinel_manager = sentinel_manager + self.reset() + + def reset(self): + self.master_address = None + self.slave_rr_counter = None + + async def get_master_address(self): + master_address = await self.sentinel_manager.discover_master(self.service_name) + if self.is_master and self.master_address != master_address: + self.master_address = master_address + # disconnect any idle connections so that they reconnect + # to the new master the next time that they are used. + connection_pool = self.connection_pool_ref() + if connection_pool is not None: + await connection_pool.disconnect(inuse_connections=False) + return master_address + + async def rotate_slaves(self) -> AsyncIterator: + """Round-robin slave balancer""" + slaves = await self.sentinel_manager.discover_slaves(self.service_name) + if slaves: + if self.slave_rr_counter is None: + self.slave_rr_counter = random.randint(0, len(slaves) - 1) + for _ in range(len(slaves)): + self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) + slave = slaves[self.slave_rr_counter] + yield slave + # Fallback to the master connection + try: + yield await self.get_master_address() + except MasterNotFoundError: + pass + raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + + class SentinelConnectionPool(ConnectionPool): """ Sentinel backed connection pool. @@ -116,12 +175,17 @@ def __init__(self, service_name, sentinel_manager, **kwargs): ) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) super().__init__(**kwargs) - self.connection_kwargs["connection_pool"] = weakref.proxy(self) + self.connection_kwargs["connection_pool"] = self.proxy self.service_name = service_name self.sentinel_manager = sentinel_manager - self.master_address = None - self.slave_rr_counter = None def __repr__(self): return ( @@ -131,8 +195,11 @@ def __repr__(self): def reset(self): super().reset() - self.master_address = None - self.slave_rr_counter = None + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address def owns_connection(self, connection: Connection): check = not self.is_master or ( @@ -141,31 +208,70 @@ def owns_connection(self, connection: Connection): return check and super().owns_connection(connection) async def get_master_address(self): - master_address = await self.sentinel_manager.discover_master(self.service_name) - if self.is_master: - if self.master_address != master_address: - self.master_address = master_address - # disconnect any idle connections so that they reconnect - # to the new master the next time that they are used. - await self.disconnect(inuse_connections=False) - return master_address + return await self.proxy.get_master_address() - async def rotate_slaves(self) -> AsyncIterator: + def rotate_slaves(self) -> AsyncIterator: """Round-robin slave balancer""" - slaves = await self.sentinel_manager.discover_slaves(self.service_name) - if slaves: - if self.slave_rr_counter is None: - self.slave_rr_counter = random.randint(0, len(slaves) - 1) - for _ in range(len(slaves)): - self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) - slave = slaves[self.slave_rr_counter] - yield slave - # Fallback to the master connection - try: - yield await self.get_master_address() - except MasterNotFoundError: - pass - raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + return self.proxy.rotate_slaves() + + +class SentinelBlockingConnectionPool(BlockingConnectionPool): + """ + Sentinel blocking connection pool. + + If ``check_connection`` flag is set to True, SentinelManagedConnection + sends a PING command right after establishing the connection. + """ + + def __init__(self, service_name, sentinel_manager, **kwargs): + kwargs["connection_class"] = kwargs.get( + "connection_class", + ( + SentinelManagedSSLConnection + if kwargs.pop("ssl", False) + else SentinelManagedConnection + ), + ) + self.is_master = kwargs.pop("is_master", True) + self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) + super().__init__(**kwargs) + self.connection_kwargs["connection_pool"] = self.proxy + self.service_name = service_name + self.sentinel_manager = sentinel_manager + + def __repr__(self): + return ( + f"<{self.__class__.__module__}.{self.__class__.__name__}" + f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>" + ) + + def reset(self): + super().reset() + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address + + def owns_connection(self, connection: Connection): + check = not self.is_master or ( + self.is_master and self.master_address == (connection.host, connection.port) + ) + return check and super().owns_connection(connection) + + async def get_master_address(self): + return await self.proxy.get_master_address() + + def rotate_slaves(self) -> AsyncIterator: + """Round-robin slave balancer""" + return self.proxy.rotate_slaves() class Sentinel(AsyncSentinelCommands): @@ -318,7 +424,10 @@ def master_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ @@ -355,7 +464,10 @@ def slave_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ diff --git a/tests/test_asyncio/test_sentinel.py b/tests/test_asyncio/test_sentinel.py index 51e59d69d0..340d358b1d 100644 --- a/tests/test_asyncio/test_sentinel.py +++ b/tests/test_asyncio/test_sentinel.py @@ -8,6 +8,7 @@ from redis.asyncio.sentinel import ( MasterNotFoundError, Sentinel, + SentinelBlockingConnectionPool, SentinelConnectionPool, SlaveNotFoundError, ) @@ -182,40 +183,77 @@ async def test_discover_slaves(cluster, sentinel): @pytest.mark.onlynoncluster -async def test_master_for(cluster, sentinel, master_ip): - async with sentinel.master_for("mymaster", db=9) as master: +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_master_for(cluster, sentinel, master_ip, connection_pool_class): + async with sentinel.master_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as master: assert await master.ping() assert master.connection_pool.master_address == (master_ip, 6379) # Use internal connection check - async with sentinel.master_for("mymaster", db=9, check_connection=True) as master: + async with sentinel.master_for( + "mymaster", + db=9, + check_connection=True, + connection_pool_class=connection_pool_class, + ) as master: assert await master.ping() @pytest.mark.onlynoncluster -async def test_slave_for(cluster, sentinel): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_for(cluster, sentinel, connection_pool_class): cluster.slaves = [ {"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False} ] - async with sentinel.slave_for("mymaster", db=9) as slave: + async with sentinel.slave_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as slave: assert await slave.ping() @pytest.mark.onlynoncluster -async def test_slave_for_slave_not_found_error(cluster, sentinel): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_for_slave_not_found_error( + cluster, + sentinel, + connection_pool_class, +): cluster.master["is_odown"] = True - async with sentinel.slave_for("mymaster", db=9) as slave: + async with sentinel.slave_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as slave: with pytest.raises(SlaveNotFoundError): await slave.ping() @pytest.mark.onlynoncluster -async def test_slave_round_robin(cluster, sentinel, master_ip): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_round_robin(cluster, sentinel, master_ip, connection_pool_class): cluster.slaves = [ {"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False}, {"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False}, ] - pool = SentinelConnectionPool("mymaster", sentinel) + pool = connection_pool_class("mymaster", sentinel) rotator = pool.rotate_slaves() assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) @@ -242,15 +280,39 @@ async def test_reset(cluster, sentinel): @pytest.mark.onlynoncluster -@pytest.mark.parametrize("method_name", ["master_for", "slave_for"]) -async def test_auto_close_pool(cluster, sentinel, method_name): +@pytest.mark.parametrize( + "method_name,connection_pool_class", + [ + pytest.param( + "master_for", + SentinelConnectionPool, + id="master_for__SentinelConnectionPool", + ), + pytest.param( + "slave_for", + SentinelConnectionPool, + id="slave_for__SentinelConnectionPool", + ), + pytest.param( + "master_for", + SentinelBlockingConnectionPool, + id="master_for__SentinelBlockingConnectionPool", + ), + pytest.param( + "slave_for", + SentinelBlockingConnectionPool, + id="slave_for__SentinelBlockingConnectionPool", + ), + ], +) +async def test_auto_close_pool(cluster, sentinel, method_name, connection_pool_class): """ Check that the connection pool created by the sentinel client is automatically closed """ method = getattr(sentinel, method_name) - client = method("mymaster", db=9) + client = method("mymaster", db=9, connection_pool_class=connection_pool_class) pool = client.connection_pool assert client.auto_close_connection_pool is True calls = 0