diff --git a/.github/wordlist.txt b/.github/wordlist.txt index ca2102b825..76cb929b33 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -44,6 +44,7 @@ RedisJSON RedisTimeSeries SHA SearchCommands +SentinelBlockingConnectionPool SentinelCommands SentinelConnectionPool Sharded diff --git a/CHANGES b/CHANGES index f0d75a45ce..a72eab801a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,5 @@ + * Add redis.asyncio.SentinelBlockingConnectionPool + * Use SentinelConnectionPoolProxy in asyncio.sentinel module * Move doctests (doc code examples) to main branch * Update `ResponseT` type hint * Allow to control the minimum SSL version @@ -66,6 +68,7 @@ * Prevent async ClusterPipeline instances from becoming "false-y" in case of empty command stack (#3061) * Close Unix sockets if the connection attempt fails. This prevents `ResourceWarning`s. (#3314) * Close SSL sockets if the connection attempt fails, or if validations fail. (#3317) + * Eliminate mutable default arguments in the `redis.commands.core.Script` class. (#3332) * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) diff --git a/dev_requirements.txt b/dev_requirements.txt index a8da4b49cd..931784cdaf 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -8,7 +8,7 @@ invoke==2.2.0 mock packaging>=20.4 pytest -pytest-asyncio +pytest-asyncio>=0.23.0,<0.24.0 pytest-cov pytest-profiling pytest-timeout diff --git a/docs/connections.rst b/docs/connections.rst index b547a7659e..098b441986 100644 --- a/docs/connections.rst +++ b/docs/connections.rst @@ -63,6 +63,24 @@ This client is used for communicating with Redis, asynchronously. :members: +Async Sentinel Client +********************* + +Sentinel (Async) +================ +.. autoclass:: redis.asyncio.sentinel.Sentinel + :members: + +SentinelConnectionPool (Async) +============================== +.. autoclass:: redis.asyncio.sentinel.SentinelConnectionPool + :members: + +SentinelBlockingConnectionPool (Async) +====================================== +.. autoclass:: redis.asyncio.sentinel.SentinelBlockingConnectionPool + :members: + Async Cluster Client ******************** diff --git a/docs/examples/asyncio_examples.ipynb b/docs/examples/asyncio_examples.ipynb index d2b11b56be..7013145879 100644 --- a/docs/examples/asyncio_examples.ipynb +++ b/docs/examples/asyncio_examples.ipynb @@ -328,6 +328,29 @@ "assert val == b\"value\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "from redis.asyncio.sentinel import Sentinel, SentinelBlockingConnectionPool\n", + "\n", + "\n", + "sentinel = Sentinel([(\"localhost\", 26379), (\"sentinel2\", 26379)])\n", + "r = sentinel.master_for(\n", + " \"mymaster\",\n", + " connection_pool_class=SentinelBlockingConnectionPool,\n", + " max_connections=5, # Max connections in pool\n", + " timeout=3, # Connection acquiring timeout in seconds. To turn off the timeout, set None\n", + ")\n", + "\n", + "# This code doesn't raise a MaxConectionsError exception\n", + "await asyncio.gather(*[r.get(\"key\") for _ in range(10)])" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/pytest.ini b/pytest.ini index 9db630e5b1..990968d6f9 100644 --- a/pytest.ini +++ b/pytest.ini @@ -14,3 +14,5 @@ timeout = 30 filterwarnings = always ignore:RedisGraph support is deprecated as of Redis Stack 7.2:DeprecationWarning + # Ignore a coverage warning when COVERAGE_CORE=sysmon for Pythons < 3.12. + ignore:sys.monitoring isn't available:coverage.exceptions.CoverageWarning diff --git a/redis/_parsers/helpers.py b/redis/_parsers/helpers.py index 85b084dfdf..7494c79210 100644 --- a/redis/_parsers/helpers.py +++ b/redis/_parsers/helpers.py @@ -445,9 +445,11 @@ def parse_cluster_info(response, **options): def _parse_node_line(line): line_items = line.split(" ") node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8] - addr = addr.split("@")[0] + ip = addr.split("@")[0] + hostname = addr.split("@")[1].split(",")[1] if "@" in addr and "," in addr else "" node_dict = { "node_id": node_id, + "hostname": hostname, "flags": flags, "master_id": master_id, "last_ping_sent": ping, @@ -460,7 +462,7 @@ def _parse_node_line(line): if len(line_items) >= 9: slots, migrations = _parse_slots(line_items[8:]) node_dict["slots"], node_dict["migrations"] = slots, migrations - return addr, node_dict + return ip, node_dict def _parse_slots(slot_ranges): diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 3545ab44c2..651472a8e3 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -9,6 +9,7 @@ ) from redis.asyncio.sentinel import ( Sentinel, + SentinelBlockingConnectionPool, SentinelConnectionPool, SentinelManagedConnection, SentinelManagedSSLConnection, @@ -53,6 +54,7 @@ "RedisError", "ResponseError", "Sentinel", + "SentinelBlockingConnectionPool", "SentinelConnectionPool", "SentinelManagedConnection", "SentinelManagedSSLConnection", diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 1845b7252f..70a5e997ef 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -579,10 +579,12 @@ async def aclose(self, close_connection_pool: Optional[bool] = None) -> None: """ Closes Redis client connection - :param close_connection_pool: decides whether to close the connection pool used - by this Redis client, overriding Redis.auto_close_connection_pool. By default, - let Redis.auto_close_connection_pool decide whether to close the connection - pool. + Args: + close_connection_pool: + decides whether to close the connection pool used by this Redis client, + overriding Redis.auto_close_connection_pool. + By default, let Redis.auto_close_connection_pool decide + whether to close the connection pool. """ conn = self.connection if conn: diff --git a/redis/asyncio/sentinel.py b/redis/asyncio/sentinel.py index 6fd233adc8..407b28e06a 100644 --- a/redis/asyncio/sentinel.py +++ b/redis/asyncio/sentinel.py @@ -1,10 +1,20 @@ import asyncio import random import weakref -from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type +from typing import ( + AsyncIterator, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) from redis.asyncio.client import Redis from redis.asyncio.connection import ( + BlockingConnectionPool, Connection, ConnectionPool, EncodableT, @@ -97,6 +107,55 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): pass +class SentinelConnectionPoolProxy: + def __init__( + self, + connection_pool, + is_master, + check_connection, + service_name, + sentinel_manager, + ): + self.connection_pool_ref = weakref.ref(connection_pool) + self.is_master = is_master + self.check_connection = check_connection + self.service_name = service_name + self.sentinel_manager = sentinel_manager + self.reset() + + def reset(self): + self.master_address = None + self.slave_rr_counter = None + + async def get_master_address(self): + master_address = await self.sentinel_manager.discover_master(self.service_name) + if self.is_master and self.master_address != master_address: + self.master_address = master_address + # disconnect any idle connections so that they reconnect + # to the new master the next time that they are used. + connection_pool = self.connection_pool_ref() + if connection_pool is not None: + await connection_pool.disconnect(inuse_connections=False) + return master_address + + async def rotate_slaves(self) -> AsyncIterator: + """Round-robin slave balancer""" + slaves = await self.sentinel_manager.discover_slaves(self.service_name) + if slaves: + if self.slave_rr_counter is None: + self.slave_rr_counter = random.randint(0, len(slaves) - 1) + for _ in range(len(slaves)): + self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) + slave = slaves[self.slave_rr_counter] + yield slave + # Fallback to the master connection + try: + yield await self.get_master_address() + except MasterNotFoundError: + pass + raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + + class SentinelConnectionPool(ConnectionPool): """ Sentinel backed connection pool. @@ -116,12 +175,17 @@ def __init__(self, service_name, sentinel_manager, **kwargs): ) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) super().__init__(**kwargs) - self.connection_kwargs["connection_pool"] = weakref.proxy(self) + self.connection_kwargs["connection_pool"] = self.proxy self.service_name = service_name self.sentinel_manager = sentinel_manager - self.master_address = None - self.slave_rr_counter = None def __repr__(self): return ( @@ -131,8 +195,11 @@ def __repr__(self): def reset(self): super().reset() - self.master_address = None - self.slave_rr_counter = None + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address def owns_connection(self, connection: Connection): check = not self.is_master or ( @@ -141,31 +208,70 @@ def owns_connection(self, connection: Connection): return check and super().owns_connection(connection) async def get_master_address(self): - master_address = await self.sentinel_manager.discover_master(self.service_name) - if self.is_master: - if self.master_address != master_address: - self.master_address = master_address - # disconnect any idle connections so that they reconnect - # to the new master the next time that they are used. - await self.disconnect(inuse_connections=False) - return master_address + return await self.proxy.get_master_address() - async def rotate_slaves(self) -> AsyncIterator: + def rotate_slaves(self) -> AsyncIterator: """Round-robin slave balancer""" - slaves = await self.sentinel_manager.discover_slaves(self.service_name) - if slaves: - if self.slave_rr_counter is None: - self.slave_rr_counter = random.randint(0, len(slaves) - 1) - for _ in range(len(slaves)): - self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) - slave = slaves[self.slave_rr_counter] - yield slave - # Fallback to the master connection - try: - yield await self.get_master_address() - except MasterNotFoundError: - pass - raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + return self.proxy.rotate_slaves() + + +class SentinelBlockingConnectionPool(BlockingConnectionPool): + """ + Sentinel blocking connection pool. + + If ``check_connection`` flag is set to True, SentinelManagedConnection + sends a PING command right after establishing the connection. + """ + + def __init__(self, service_name, sentinel_manager, **kwargs): + kwargs["connection_class"] = kwargs.get( + "connection_class", + ( + SentinelManagedSSLConnection + if kwargs.pop("ssl", False) + else SentinelManagedConnection + ), + ) + self.is_master = kwargs.pop("is_master", True) + self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) + super().__init__(**kwargs) + self.connection_kwargs["connection_pool"] = self.proxy + self.service_name = service_name + self.sentinel_manager = sentinel_manager + + def __repr__(self): + return ( + f"<{self.__class__.__module__}.{self.__class__.__name__}" + f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>" + ) + + def reset(self): + super().reset() + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address + + def owns_connection(self, connection: Connection): + check = not self.is_master or ( + self.is_master and self.master_address == (connection.host, connection.port) + ) + return check and super().owns_connection(connection) + + async def get_master_address(self): + return await self.proxy.get_master_address() + + def rotate_slaves(self) -> AsyncIterator: + """Round-robin slave balancer""" + return self.proxy.rotate_slaves() class Sentinel(AsyncSentinelCommands): @@ -318,7 +424,10 @@ def master_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ @@ -355,7 +464,10 @@ def slave_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ diff --git a/redis/commands/core.py b/redis/commands/core.py index b356d101ee..d46e55446c 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -46,8 +46,8 @@ from .helpers import list_or_args if TYPE_CHECKING: - from redis.asyncio.client import Redis as AsyncRedis - from redis.client import Redis + import redis.asyncio.client + import redis.client class ACLCommands(CommandsProtocol): @@ -731,16 +731,19 @@ def client_pause(self, timeout: int, all: bool = True, **kwargs) -> ResponseT: For more information see https://redis.io/commands/client-pause - :param timeout: milliseconds to pause clients - :param all: If true (default) all client commands are blocked. - otherwise, clients are only blocked if they attempt to execute - a write command. + Args: + timeout: milliseconds to pause clients + all: If true (default) all client commands are blocked. + otherwise, clients are only blocked if they attempt to execute + a write command. + For the WRITE mode, some commands have special behavior: - EVAL/EVALSHA: Will block client for all scripts. - PUBLISH: Will block client. - PFCOUNT: Will block client. - WAIT: Acknowledgments will be delayed, so this command will - appear blocked. + + * EVAL/EVALSHA: Will block client for all scripts. + * PUBLISH: Will block client. + * PFCOUNT: Will block client. + * WAIT: Acknowledgments will be delayed, so this command will + appear blocked. """ args = ["CLIENT PAUSE", str(timeout)] if not isinstance(timeout, int): @@ -1439,7 +1442,7 @@ class BitFieldOperation: def __init__( self, - client: Union["Redis", "AsyncRedis"], + client: Union["redis.client.Redis", "redis.asyncio.client.Redis"], key: str, default_overflow: Union[str, None] = None, ): @@ -1583,7 +1586,7 @@ def bitcount( return self.execute_command("BITCOUNT", *params, keys=[key]) def bitfield( - self: Union["Redis", "AsyncRedis"], + self: Union["redis.client.Redis", "redis.asyncio.client.Redis"], key: KeyT, default_overflow: Union[str, None] = None, ) -> BitFieldOperation: @@ -1596,7 +1599,7 @@ def bitfield( return BitFieldOperation(self, key, default_overflow=default_overflow) def bitfield_ro( - self: Union["Redis", "AsyncRedis"], + self: Union["redis.client.Redis", "redis.asyncio.client.Redis"], key: KeyT, encoding: str, offset: BitfieldOffsetT, @@ -5464,7 +5467,7 @@ class Script: An executable Lua script object returned by ``register_script`` """ - def __init__(self, registered_client: "Redis", script: ScriptTextT): + def __init__(self, registered_client: "redis.client.Redis", script: ScriptTextT): self.registered_client = registered_client self.script = script # Precalculate and store the SHA1 hex digest of the script. @@ -5472,11 +5475,7 @@ def __init__(self, registered_client: "Redis", script: ScriptTextT): if isinstance(script, str): # We need the encoding from the client in order to generate an # accurate byte representation of the script - try: - encoder = registered_client.connection_pool.get_encoder() - except AttributeError: - # Cluster - encoder = registered_client.get_encoder() + encoder = self.get_encoder() script = encoder.encode(script) self.sha = hashlib.sha1(script).hexdigest() @@ -5484,7 +5483,7 @@ def __call__( self, keys: Union[Sequence[KeyT], None] = None, args: Union[Iterable[EncodableT], None] = None, - client: Union["Redis", None] = None, + client: Union["redis.client.Redis", None] = None, ): """Execute the script, passing any required ``args``""" keys = keys or [] @@ -5507,13 +5506,35 @@ def __call__( self.sha = client.script_load(self.script) return client.evalsha(self.sha, len(keys), *args) + def get_encoder(self): + """Get the encoder to encode string scripts into bytes.""" + try: + return self.registered_client.get_encoder() + except AttributeError: + # DEPRECATED + # In version <=4.1.2, this was the code we used to get the encoder. + # However, after 4.1.2 we added support for scripting in clustered + # redis. ClusteredRedis doesn't have a `.connection_pool` attribute + # so we changed the Script class to use + # `self.registered_client.get_encoder` (see above). + # However, that is technically a breaking change, as consumers who + # use Scripts directly might inject a `registered_client` that + # doesn't have a `.get_encoder` field. This try/except prevents us + # from breaking backward-compatibility. Ideally, it would be + # removed in the next major release. + return self.registered_client.connection_pool.get_encoder() + class AsyncScript: """ An executable Lua script object returned by ``register_script`` """ - def __init__(self, registered_client: "AsyncRedis", script: ScriptTextT): + def __init__( + self, + registered_client: "redis.asyncio.client.Redis", + script: ScriptTextT, + ): self.registered_client = registered_client self.script = script # Precalculate and store the SHA1 hex digest of the script. @@ -5533,7 +5554,7 @@ async def __call__( self, keys: Union[Sequence[KeyT], None] = None, args: Union[Iterable[EncodableT], None] = None, - client: Union["AsyncRedis", None] = None, + client: Union["redis.asyncio.client.Redis", None] = None, ): """Execute the script, passing any required ``args``""" keys = keys or [] @@ -5758,7 +5779,7 @@ def script_load(self, script: ScriptTextT) -> ResponseT: """ return self.execute_command("SCRIPT LOAD", script) - def register_script(self: "Redis", script: ScriptTextT) -> Script: + def register_script(self: "redis.client.Redis", script: ScriptTextT) -> Script: """ Register a Lua ``script`` specifying the ``keys`` it will touch. Returns a Script object that is callable and hides the complexity of @@ -5772,7 +5793,10 @@ class AsyncScriptCommands(ScriptCommands): async def script_debug(self, *args) -> None: return super().script_debug() - def register_script(self: "AsyncRedis", script: ScriptTextT) -> AsyncScript: + def register_script( + self: "redis.asyncio.client.Redis", + script: ScriptTextT, + ) -> AsyncScript: """ Register a Lua ``script`` specifying the ``keys`` it will touch. Returns a Script object that is callable and hides the complexity of @@ -6283,62 +6307,6 @@ def command(self) -> ResponseT: return self.execute_command("COMMAND") -class Script: - """ - An executable Lua script object returned by ``register_script`` - """ - - def __init__(self, registered_client, script): - self.registered_client = registered_client - self.script = script - # Precalculate and store the SHA1 hex digest of the script. - - if isinstance(script, str): - # We need the encoding from the client in order to generate an - # accurate byte representation of the script - encoder = self.get_encoder() - script = encoder.encode(script) - self.sha = hashlib.sha1(script).hexdigest() - - def __call__(self, keys=[], args=[], client=None): - "Execute the script, passing any required ``args``" - if client is None: - client = self.registered_client - args = tuple(keys) + tuple(args) - # make sure the Redis server knows about the script - from redis.client import Pipeline - - if isinstance(client, Pipeline): - # Make sure the pipeline can register the script before executing. - client.scripts.add(self) - try: - return client.evalsha(self.sha, len(keys), *args) - except NoScriptError: - # Maybe the client is pointed to a different server than the client - # that created this instance? - # Overwrite the sha just in case there was a discrepancy. - self.sha = client.script_load(self.script) - return client.evalsha(self.sha, len(keys), *args) - - def get_encoder(self): - """Get the encoder to encode string scripts into bytes.""" - try: - return self.registered_client.get_encoder() - except AttributeError: - # DEPRECATED - # In version <=4.1.2, this was the code we used to get the encoder. - # However, after 4.1.2 we added support for scripting in clustered - # redis. ClusteredRedis doesn't have a `.connection_pool` attribute - # so we changed the Script class to use - # `self.registered_client.get_encoder` (see above). - # However, that is technically a breaking change, as consumers who - # use Scripts directly might inject a `registered_client` that - # doesn't have a `.get_encoder` field. This try/except prevents us - # from breaking backward-compatibility. Ideally, it would be - # removed in the next major release. - return self.registered_client.connection_pool.get_encoder() - - class AsyncModuleCommands(ModuleCommands): async def command_info(self) -> None: return super().command_info() @@ -6415,9 +6383,12 @@ def function_list( ) -> Union[Awaitable[List], List]: """ Return information about the functions and libraries. - :param library: pecify a pattern for matching library names - :param withcode: cause the server to include the libraries source - implementation in the reply + + Args: + + library: specify a pattern for matching library names + withcode: cause the server to include the libraries source implementation + in the reply """ args = ["LIBRARYNAME", library] if withcode: diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index f8dfe8b5c0..b0cb864237 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -60,17 +60,17 @@ def create( duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - - 'block': An error will occur and the new value will be ignored. - - 'first': Ignore the new value. - - 'last': Override with the latest value. - - 'min': Only override if the value is lower than the existing - value. - - 'max': Only override if the value is higher than the existing - value. - - 'sum': If a previous sample exists, add the new sample to it so - that the updated value is equal to (previous + new). If no - previous sample exists, set the updated value equal to the new - value. + + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing value. + - 'max': Only override if the value is higher than the existing value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last @@ -130,17 +130,17 @@ def alter( duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - - 'block': An error will occur and the new value will be ignored. - - 'first': Ignore the new value. - - 'last': Override with the latest value. - - 'min': Only override if the value is lower than the existing - value. - - 'max': Only override if the value is higher than the existing - value. - - 'sum': If a previous sample exists, add the new sample to it so - that the updated value is equal to (previous + new). If no - previous sample exists, set the updated value equal to the new - value. + + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing value. + - 'max': Only override if the value is higher than the existing value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last @@ -210,17 +210,17 @@ def add( duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - - 'block': An error will occur and the new value will be ignored. - - 'first': Ignore the new value. - - 'last': Override with the latest value. - - 'min': Only override if the value is lower than the existing - value. - - 'max': Only override if the value is higher than the existing - value. - - 'sum': If a previous sample exists, add the new sample to it so - that the updated value is equal to (previous + new). If no - previous sample exists, set the updated value equal to the new - value. + + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing value. + - 'max': Only override if the value is higher than the existing value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last @@ -331,17 +331,17 @@ def incrby( duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - - 'block': An error will occur and the new value will be ignored. - - 'first': Ignore the new value. - - 'last': Override with the latest value. - - 'min': Only override if the value is lower than the existing - value. - - 'max': Only override if the value is higher than the existing - value. - - 'sum': If a previous sample exists, add the new sample to it so - that the updated value is equal to (previous + new). If no - previous sample exists, set the updated value equal to the new - value. + + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing value. + - 'max': Only override if the value is higher than the existing value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last @@ -423,17 +423,17 @@ def decrby( duplicate_policy: Policy for handling multiple samples with identical timestamps. Can be one of: - - 'block': An error will occur and the new value will be ignored. - - 'first': Ignore the new value. - - 'last': Override with the latest value. - - 'min': Only override if the value is lower than the existing - value. - - 'max': Only override if the value is higher than the existing - value. - - 'sum': If a previous sample exists, add the new sample to it so - that the updated value is equal to (previous + new). If no - previous sample exists, set the updated value equal to the new - value. + + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing value. + - 'max': Only override if the value is higher than the existing value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: A non-negative integer value, in milliseconds, that sets an ignore threshold for added timestamps. If the difference between the last diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index c16272bb5b..fefa4ef8f9 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -313,7 +313,8 @@ async def mock_aclose(): called += 1 with mock.patch.object(cluster, "aclose", mock_aclose): - await cluster.close() + with pytest.warns(DeprecationWarning, match=r"Use aclose\(\) instead"): + await cluster.close() assert called == 1 await cluster.aclose() diff --git a/tests/test_asyncio/test_sentinel.py b/tests/test_asyncio/test_sentinel.py index 51e59d69d0..340d358b1d 100644 --- a/tests/test_asyncio/test_sentinel.py +++ b/tests/test_asyncio/test_sentinel.py @@ -8,6 +8,7 @@ from redis.asyncio.sentinel import ( MasterNotFoundError, Sentinel, + SentinelBlockingConnectionPool, SentinelConnectionPool, SlaveNotFoundError, ) @@ -182,40 +183,77 @@ async def test_discover_slaves(cluster, sentinel): @pytest.mark.onlynoncluster -async def test_master_for(cluster, sentinel, master_ip): - async with sentinel.master_for("mymaster", db=9) as master: +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_master_for(cluster, sentinel, master_ip, connection_pool_class): + async with sentinel.master_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as master: assert await master.ping() assert master.connection_pool.master_address == (master_ip, 6379) # Use internal connection check - async with sentinel.master_for("mymaster", db=9, check_connection=True) as master: + async with sentinel.master_for( + "mymaster", + db=9, + check_connection=True, + connection_pool_class=connection_pool_class, + ) as master: assert await master.ping() @pytest.mark.onlynoncluster -async def test_slave_for(cluster, sentinel): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_for(cluster, sentinel, connection_pool_class): cluster.slaves = [ {"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False} ] - async with sentinel.slave_for("mymaster", db=9) as slave: + async with sentinel.slave_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as slave: assert await slave.ping() @pytest.mark.onlynoncluster -async def test_slave_for_slave_not_found_error(cluster, sentinel): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_for_slave_not_found_error( + cluster, + sentinel, + connection_pool_class, +): cluster.master["is_odown"] = True - async with sentinel.slave_for("mymaster", db=9) as slave: + async with sentinel.slave_for( + "mymaster", + db=9, + connection_pool_class=connection_pool_class, + ) as slave: with pytest.raises(SlaveNotFoundError): await slave.ping() @pytest.mark.onlynoncluster -async def test_slave_round_robin(cluster, sentinel, master_ip): +@pytest.mark.parametrize( + "connection_pool_class", + [SentinelConnectionPool, SentinelBlockingConnectionPool], +) +async def test_slave_round_robin(cluster, sentinel, master_ip, connection_pool_class): cluster.slaves = [ {"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False}, {"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False}, ] - pool = SentinelConnectionPool("mymaster", sentinel) + pool = connection_pool_class("mymaster", sentinel) rotator = pool.rotate_slaves() assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) @@ -242,15 +280,39 @@ async def test_reset(cluster, sentinel): @pytest.mark.onlynoncluster -@pytest.mark.parametrize("method_name", ["master_for", "slave_for"]) -async def test_auto_close_pool(cluster, sentinel, method_name): +@pytest.mark.parametrize( + "method_name,connection_pool_class", + [ + pytest.param( + "master_for", + SentinelConnectionPool, + id="master_for__SentinelConnectionPool", + ), + pytest.param( + "slave_for", + SentinelConnectionPool, + id="slave_for__SentinelConnectionPool", + ), + pytest.param( + "master_for", + SentinelBlockingConnectionPool, + id="master_for__SentinelBlockingConnectionPool", + ), + pytest.param( + "slave_for", + SentinelBlockingConnectionPool, + id="slave_for__SentinelBlockingConnectionPool", + ), + ], +) +async def test_auto_close_pool(cluster, sentinel, method_name, connection_pool_class): """ Check that the connection pool created by the sentinel client is automatically closed """ method = getattr(sentinel, method_name) - client = method("mymaster", db=9) + client = method("mymaster", db=9, connection_pool_class=connection_pool_class) pool = client.connection_pool assert client.auto_close_connection_pool is True calls = 0