Skip to content

Feature/sentinel blocking connection pool #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RedisJSON
RedisTimeSeries
SHA
SearchCommands
SentinelBlockingConnectionPool
SentinelCommands
SentinelConnectionPool
Sharded
Expand Down
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Add redis.asyncio.SentinelBlockingConnectionPool
* Use SentinelConnectionPoolProxy in asyncio.sentinel module
* Move doctests (doc code examples) to main branch
* Update `ResponseT` type hint
* Allow to control the minimum SSL version
Expand Down Expand Up @@ -66,6 +68,7 @@
* Prevent async ClusterPipeline instances from becoming "false-y" in case of empty command stack (#3061)
* Close Unix sockets if the connection attempt fails. This prevents `ResourceWarning`s. (#3314)
* Close SSL sockets if the connection attempt fails, or if validations fail. (#3317)
* Eliminate mutable default arguments in the `redis.commands.core.Script` class. (#3332)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ invoke==2.2.0
mock
packaging>=20.4
pytest
pytest-asyncio
pytest-asyncio>=0.23.0,<0.24.0
pytest-cov
pytest-profiling
pytest-timeout
Expand Down
18 changes: 18 additions & 0 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ This client is used for communicating with Redis, asynchronously.
:members:


Async Sentinel Client
*********************

Sentinel (Async)
================
.. autoclass:: redis.asyncio.sentinel.Sentinel
:members:

SentinelConnectionPool (Async)
==============================
.. autoclass:: redis.asyncio.sentinel.SentinelConnectionPool
:members:

SentinelBlockingConnectionPool (Async)
======================================
.. autoclass:: redis.asyncio.sentinel.SentinelBlockingConnectionPool
:members:

Async Cluster Client
********************

Expand Down
23 changes: 23 additions & 0 deletions docs/examples/asyncio_examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,29 @@
"assert val == b\"value\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"from redis.asyncio.sentinel import Sentinel, SentinelBlockingConnectionPool\n",
"\n",
"\n",
"sentinel = Sentinel([(\"localhost\", 26379), (\"sentinel2\", 26379)])\n",
"r = sentinel.master_for(\n",
" \"mymaster\",\n",
" connection_pool_class=SentinelBlockingConnectionPool,\n",
" max_connections=5, # Max connections in pool\n",
" timeout=3, # Connection acquiring timeout in seconds. To turn off the timeout, set None\n",
")\n",
"\n",
"# This code doesn't raise a MaxConectionsError exception\n",
"await asyncio.gather(*[r.get(\"key\") for _ in range(10)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ timeout = 30
filterwarnings =
always
ignore:RedisGraph support is deprecated as of Redis Stack 7.2:DeprecationWarning
# Ignore a coverage warning when COVERAGE_CORE=sysmon for Pythons < 3.12.
ignore:sys.monitoring isn't available:coverage.exceptions.CoverageWarning
6 changes: 4 additions & 2 deletions redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,11 @@ def parse_cluster_info(response, **options):
def _parse_node_line(line):
line_items = line.split(" ")
node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8]
addr = addr.split("@")[0]
ip = addr.split("@")[0]
hostname = addr.split("@")[1].split(",")[1] if "@" in addr and "," in addr else ""
node_dict = {
"node_id": node_id,
"hostname": hostname,
"flags": flags,
"master_id": master_id,
"last_ping_sent": ping,
Expand All @@ -460,7 +462,7 @@ def _parse_node_line(line):
if len(line_items) >= 9:
slots, migrations = _parse_slots(line_items[8:])
node_dict["slots"], node_dict["migrations"] = slots, migrations
return addr, node_dict
return ip, node_dict


def _parse_slots(slot_ranges):
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from redis.asyncio.sentinel import (
Sentinel,
SentinelBlockingConnectionPool,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
Expand Down Expand Up @@ -53,6 +54,7 @@
"RedisError",
"ResponseError",
"Sentinel",
"SentinelBlockingConnectionPool",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
Expand Down
10 changes: 6 additions & 4 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,12 @@ async def aclose(self, close_connection_pool: Optional[bool] = None) -> None:
"""
Closes Redis client connection

:param close_connection_pool: decides whether to close the connection pool used
by this Redis client, overriding Redis.auto_close_connection_pool. By default,
let Redis.auto_close_connection_pool decide whether to close the connection
pool.
Args:
close_connection_pool:
decides whether to close the connection pool used by this Redis client,
overriding Redis.auto_close_connection_pool.
By default, let Redis.auto_close_connection_pool decide
whether to close the connection pool.
"""
conn = self.connection
if conn:
Expand Down
174 changes: 143 additions & 31 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import asyncio
import random
import weakref
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
from typing import (
AsyncIterator,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
)

from redis.asyncio.client import Redis
from redis.asyncio.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
EncodableT,
Expand Down Expand Up @@ -97,6 +107,55 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
pass


class SentinelConnectionPoolProxy:
def __init__(
self,
connection_pool,
is_master,
check_connection,
service_name,
sentinel_manager,
):
self.connection_pool_ref = weakref.ref(connection_pool)
self.is_master = is_master
self.check_connection = check_connection
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.reset()

def reset(self):
self.master_address = None
self.slave_rr_counter = None

async def get_master_address(self):
master_address = await self.sentinel_manager.discover_master(self.service_name)
if self.is_master and self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
connection_pool = self.connection_pool_ref()
if connection_pool is not None:
await connection_pool.disconnect(inuse_connections=False)
return master_address

async def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield await self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")


class SentinelConnectionPool(ConnectionPool):
"""
Sentinel backed connection pool.
Expand All @@ -116,12 +175,17 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None

def __repr__(self):
return (
Expand All @@ -131,8 +195,11 @@ def __repr__(self):

def reset(self):
super().reset()
self.master_address = None
self.slave_rr_counter = None
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection: Connection):
check = not self.is_master or (
Expand All @@ -141,31 +208,70 @@ def owns_connection(self, connection: Connection):
return check and super().owns_connection(connection)

async def get_master_address(self):
master_address = await self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
await self.disconnect(inuse_connections=False)
return master_address
return await self.proxy.get_master_address()

async def rotate_slaves(self) -> AsyncIterator:
def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield await self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
return self.proxy.rotate_slaves()


class SentinelBlockingConnectionPool(BlockingConnectionPool):
"""
Sentinel blocking connection pool.

If ``check_connection`` flag is set to True, SentinelManagedConnection
sends a PING command right after establishing the connection.
"""

def __init__(self, service_name, sentinel_manager, **kwargs):
kwargs["connection_class"] = kwargs.get(
"connection_class",
(
SentinelManagedSSLConnection
if kwargs.pop("ssl", False)
else SentinelManagedConnection
),
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager

def __repr__(self):
return (
f"<{self.__class__.__module__}.{self.__class__.__name__}"
f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
)

def reset(self):
super().reset()
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection: Connection):
check = not self.is_master or (
self.is_master and self.master_address == (connection.host, connection.port)
)
return check and super().owns_connection(connection)

async def get_master_address(self):
return await self.proxy.get_master_address()

def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
return self.proxy.rotate_slaves()


class Sentinel(AsyncSentinelCommands):
Expand Down Expand Up @@ -318,7 +424,10 @@ def master_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
connection_pool_class: Union[
Type[SentinelConnectionPool],
Type[SentinelBlockingConnectionPool],
] = SentinelConnectionPool,
**kwargs,
):
"""
Expand Down Expand Up @@ -355,7 +464,10 @@ def slave_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
connection_pool_class: Union[
Type[SentinelConnectionPool],
Type[SentinelBlockingConnectionPool],
] = SentinelConnectionPool,
**kwargs,
):
"""
Expand Down
Loading
Loading