Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented redis.asyncio.SentinelBlockingConnectionPool. #3321

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ RedisJSON
RedisTimeSeries
SHA
SearchCommands
SentinelBlockingConnectionPool
SentinelCommands
SentinelConnectionPool
Sharded
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Add redis.asyncio.SentinelBlockingConnectionPool
* Use SentinelConnectionPoolProxy in asyncio.sentinel module
* Move doctests (doc code examples) to main branch
* Update `ResponseT` type hint
* Allow to control the minimum SSL version
18 changes: 18 additions & 0 deletions docs/connections.rst
Original file line number Diff line number Diff line change
@@ -63,6 +63,24 @@ This client is used for communicating with Redis, asynchronously.
:members:


Async Sentinel Client
*********************

Sentinel (Async)
================
.. autoclass:: redis.asyncio.sentinel.Sentinel
:members:

SentinelConnectionPool (Async)
==============================
.. autoclass:: redis.asyncio.sentinel.SentinelConnectionPool
:members:

SentinelBlockingConnectionPool (Async)
======================================
.. autoclass:: redis.asyncio.sentinel.SentinelBlockingConnectionPool
:members:

Async Cluster Client
********************

23 changes: 23 additions & 0 deletions docs/examples/asyncio_examples.ipynb
Original file line number Diff line number Diff line change
@@ -328,6 +328,29 @@
"assert val == b\"value\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"from redis.asyncio.sentinel import Sentinel, SentinelBlockingConnectionPool\n",
"\n",
"\n",
"sentinel = Sentinel([(\"localhost\", 26379), (\"sentinel2\", 26379)])\n",
"r = sentinel.master_for(\n",
" \"mymaster\",\n",
" connection_pool_class=SentinelBlockingConnectionPool,\n",
" max_connections=5, # Max connections in pool\n",
" timeout=3, # Connection acquiring timeout in seconds. To turn off the timeout, set None\n",
")\n",
"\n",
"# This code doesn't raise a MaxConectionsError exception\n",
"await asyncio.gather(*[r.get(\"key\") for _ in range(10)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
2 changes: 2 additions & 0 deletions redis/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
)
from redis.asyncio.sentinel import (
Sentinel,
SentinelBlockingConnectionPool,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
@@ -53,6 +54,7 @@
"RedisError",
"ResponseError",
"Sentinel",
"SentinelBlockingConnectionPool",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
174 changes: 143 additions & 31 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import asyncio
import random
import weakref
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
from typing import (
AsyncIterator,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
)

from redis.asyncio.client import Redis
from redis.asyncio.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
EncodableT,
@@ -97,6 +107,55 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
pass


class SentinelConnectionPoolProxy:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this proxy class? get_master_address and rotate_slaves methods implementations are exactly the same as in SentinelConnectionPool class. The only difference that I see is the missing calls to parent object in constructor and reset.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because it is already done in the synchronous version: https://github.com/redis/redis-py/blob/master/redis/sentinel.py#L89

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I get it. It's a bad design approach and I don't want to spread it even more. It should be implemented correctly and I will create an issue for the future to do the same with sync version.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'll think about a better solution

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best solution will be refactor async ConnectionPool the way we don't need to use workarounds to break inheritance limitations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello. What if we move BlockingConnectionPool logic to ConnectionPool?
We can do something like this:

from typing import Optional


# If the timeout parameter is zero, then we have the old behavior,
# else we have a blocking connection pool
class ConnectionPool:
    def __init__(
        self,
        timeout: Optional[int] = 0,
        **kwargs,
    ) -> None:
        ...


# For backward compatibility
class BlockingConnectionPool(ConnectionPool):
    def __init__(self, timeout: Optional[int] = 20, **kwargs) -> None:
        super().__init__(timeout=timeout, **kwargs)

def __init__(
self,
connection_pool,
is_master,
check_connection,
service_name,
sentinel_manager,
):
self.connection_pool_ref = weakref.ref(connection_pool)
self.is_master = is_master
self.check_connection = check_connection
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.reset()

def reset(self):
self.master_address = None
self.slave_rr_counter = None

async def get_master_address(self):
master_address = await self.sentinel_manager.discover_master(self.service_name)
if self.is_master and self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
connection_pool = self.connection_pool_ref()
if connection_pool is not None:
await connection_pool.disconnect(inuse_connections=False)
return master_address

async def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield await self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")


class SentinelConnectionPool(ConnectionPool):
"""
Sentinel backed connection pool.
@@ -116,12 +175,17 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, conceptually proxies are used to add an additional functionality before or after we want to access the original object. But here proxy are used just to override super class methods that breaks the functionality of child.

Proxy here is the workaround, to hide a problem with inheritance that we have. We have to either fix it in ConnectionPool so it can be inheritable, or doesn't inherit this at all

connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None

def __repr__(self):
return (
@@ -131,8 +195,11 @@ def __repr__(self):

def reset(self):
super().reset()
self.master_address = None
self.slave_rr_counter = None
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection: Connection):
check = not self.is_master or (
@@ -141,31 +208,70 @@ def owns_connection(self, connection: Connection):
return check and super().owns_connection(connection)

async def get_master_address(self):
master_address = await self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
await self.disconnect(inuse_connections=False)
return master_address
return await self.proxy.get_master_address()

async def rotate_slaves(self) -> AsyncIterator:
def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield await self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
return self.proxy.rotate_slaves()


class SentinelBlockingConnectionPool(BlockingConnectionPool):
"""
Sentinel blocking connection pool.
If ``check_connection`` flag is set to True, SentinelManagedConnection
sends a PING command right after establishing the connection.
"""

def __init__(self, service_name, sentinel_manager, **kwargs):
kwargs["connection_class"] = kwargs.get(
"connection_class",
(
SentinelManagedSSLConnection
if kwargs.pop("ssl", False)
else SentinelManagedConnection
),
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager

def __repr__(self):
return (
f"<{self.__class__.__module__}.{self.__class__.__name__}"
f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
)

def reset(self):
super().reset()
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection: Connection):
check = not self.is_master or (
self.is_master and self.master_address == (connection.host, connection.port)
)
return check and super().owns_connection(connection)

async def get_master_address(self):
return await self.proxy.get_master_address()

def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
return self.proxy.rotate_slaves()


class Sentinel(AsyncSentinelCommands):
@@ -318,7 +424,10 @@ def master_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
connection_pool_class: Union[
Type[SentinelConnectionPool],
Type[SentinelBlockingConnectionPool],
] = SentinelConnectionPool,
**kwargs,
):
"""
@@ -355,7 +464,10 @@ def slave_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
connection_pool_class: Union[
Type[SentinelConnectionPool],
Type[SentinelBlockingConnectionPool],
] = SentinelConnectionPool,
**kwargs,
):
"""
86 changes: 74 additions & 12 deletions tests/test_asyncio/test_sentinel.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
from redis.asyncio.sentinel import (
MasterNotFoundError,
Sentinel,
SentinelBlockingConnectionPool,
SentinelConnectionPool,
SlaveNotFoundError,
)
@@ -182,40 +183,77 @@ async def test_discover_slaves(cluster, sentinel):


@pytest.mark.onlynoncluster
async def test_master_for(cluster, sentinel, master_ip):
async with sentinel.master_for("mymaster", db=9) as master:
@pytest.mark.parametrize(
"connection_pool_class",
[SentinelConnectionPool, SentinelBlockingConnectionPool],
)
async def test_master_for(cluster, sentinel, master_ip, connection_pool_class):
async with sentinel.master_for(
"mymaster",
db=9,
connection_pool_class=connection_pool_class,
) as master:
assert await master.ping()
assert master.connection_pool.master_address == (master_ip, 6379)

# Use internal connection check
async with sentinel.master_for("mymaster", db=9, check_connection=True) as master:
async with sentinel.master_for(
"mymaster",
db=9,
check_connection=True,
connection_pool_class=connection_pool_class,
) as master:
assert await master.ping()


@pytest.mark.onlynoncluster
async def test_slave_for(cluster, sentinel):
@pytest.mark.parametrize(
"connection_pool_class",
[SentinelConnectionPool, SentinelBlockingConnectionPool],
)
async def test_slave_for(cluster, sentinel, connection_pool_class):
cluster.slaves = [
{"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False}
]
async with sentinel.slave_for("mymaster", db=9) as slave:
async with sentinel.slave_for(
"mymaster",
db=9,
connection_pool_class=connection_pool_class,
) as slave:
assert await slave.ping()


@pytest.mark.onlynoncluster
async def test_slave_for_slave_not_found_error(cluster, sentinel):
@pytest.mark.parametrize(
"connection_pool_class",
[SentinelConnectionPool, SentinelBlockingConnectionPool],
)
async def test_slave_for_slave_not_found_error(
cluster,
sentinel,
connection_pool_class,
):
cluster.master["is_odown"] = True
async with sentinel.slave_for("mymaster", db=9) as slave:
async with sentinel.slave_for(
"mymaster",
db=9,
connection_pool_class=connection_pool_class,
) as slave:
with pytest.raises(SlaveNotFoundError):
await slave.ping()


@pytest.mark.onlynoncluster
async def test_slave_round_robin(cluster, sentinel, master_ip):
@pytest.mark.parametrize(
"connection_pool_class",
[SentinelConnectionPool, SentinelBlockingConnectionPool],
)
async def test_slave_round_robin(cluster, sentinel, master_ip, connection_pool_class):
cluster.slaves = [
{"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False},
{"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False},
]
pool = SentinelConnectionPool("mymaster", sentinel)
pool = connection_pool_class("mymaster", sentinel)
rotator = pool.rotate_slaves()
assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379))
assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379))
@@ -242,15 +280,39 @@ async def test_reset(cluster, sentinel):


@pytest.mark.onlynoncluster
@pytest.mark.parametrize("method_name", ["master_for", "slave_for"])
async def test_auto_close_pool(cluster, sentinel, method_name):
@pytest.mark.parametrize(
"method_name,connection_pool_class",
[
pytest.param(
"master_for",
SentinelConnectionPool,
id="master_for__SentinelConnectionPool",
),
pytest.param(
"slave_for",
SentinelConnectionPool,
id="slave_for__SentinelConnectionPool",
),
pytest.param(
"master_for",
SentinelBlockingConnectionPool,
id="master_for__SentinelBlockingConnectionPool",
),
pytest.param(
"slave_for",
SentinelBlockingConnectionPool,
id="slave_for__SentinelBlockingConnectionPool",
),
],
)
async def test_auto_close_pool(cluster, sentinel, method_name, connection_pool_class):
"""
Check that the connection pool created by the sentinel client is
automatically closed
"""

method = getattr(sentinel, method_name)
client = method("mymaster", db=9)
client = method("mymaster", db=9, connection_pool_class=connection_pool_class)
pool = client.connection_pool
assert client.auto_close_connection_pool is True
calls = 0