Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCO-3405] feat: Support replicas for the redis adapter #865

Merged
merged 3 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions dev/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
version: "3"
services:
redis:
redis-primary:
image: redis:7.2
container_name: redis-primary
restart: always
volumes:
- redis_primary:/data
ports:
- 6379:6379

redis-replica:
image: redis:7.2
container_name: redis-replica
restart: always
volumes:
- redis_replica:/data
ports:
- "6379:6379"
- 6380:6379
command: redis-server --replicaof redis-primary 6379

redis-commander:
image: ghcr.io/joeferner/redis-commander:latest
restart: always
environment:
- REDIS_HOSTS=local:redis:6379
- REDIS_HOSTS=local:redis-primary:6379
ports:
- "8081:8081"
- 8081:8081
volumes:
redis_primary:
redis_replica:
2 changes: 1 addition & 1 deletion merino/cache/none.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ async def close(self) -> None: # noqa: D102
def register_script(self, sid: str, script: str) -> None: # noqa: D102
pass

async def run_script(self, sid: str, keys: list, args: list) -> Any: # noqa: D102
async def run_script(self, sid: str, keys: list, args: list, readonly: bool = False) -> Any: # noqa: D102
pass
2 changes: 2 additions & 0 deletions merino/cache/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ async def run_script(
sid: str,
keys: list,
args: list,
readonly: bool = False,
) -> Any: # pragma: no cover
"""Run a given script with keys and arguments.

Params:
- `sid` {str}, a script identifier
- `keys` list[str], a list of keys used as the global `KEYS` in Redis scripting
- `args` list[str], a list of arguments used as the global `ARGV` in Redis scripting
- `readonly` bool, whether or not the script is readonly. Readonly scripts can be run on replica servers.
Returns:
A Redis value based on the return value of the specified script
Raises:
Expand Down
68 changes: 56 additions & 12 deletions merino/cache/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,50 @@
from merino.exceptions import CacheAdapterError


class RedisAdapter:
"""A cache adapter that stores key-value pairs in Redis."""
def create_redis_clients(
primary: str, replica: str, max_connections: int, db: int = 0
) -> tuple[Redis, Redis]:
"""Create redis clients for the primary and replica severs.
When `replica` is the same as `primary`, the returned clients are the same.

Args:
- `primary`: the URL to the Redis primary endpoint.
- `replica`: the URL to the Redis replica endpoint.
- `max_connections`: the maximum connections allowed in the connection pool.
- `db`: the ID (`SELECT db`) of the DB to which the clients connect.
Returns:
- A tuple of two clients, the first for the primary server and the second for the replica endpoint.
When `primary` is the same as `replica`, the two share the same underlying client.
"""
client_primary: Redis = Redis.from_url(primary, db=db, max_connections=max_connections)
client_replica: Redis
if primary == replica:
client_replica = client_primary
else:
client_replica = Redis.from_url(replica, db=db, max_connections=max_connections)

return client_primary, client_replica


redis: Redis
class RedisAdapter:
"""A cache adapter that stores key-value pairs in Redis.
Merino's Redis server employes replication for high availability. Hence
each adapter maintains two clients connected to the primary endpoint and
the replica endpoint, respectively. To ease the development and testing,
the primary and replica clients can be set to the same underlying client,
all the commands should work as expected in both primary-replica and
standalone modes.

Note that only readonly commands can be executed on the replica nodes.
"""

primary: Redis
replica: Redis
scripts: dict[str, AsyncScript] = {}

def __init__(self, redis: Redis):
self.redis = redis
def __init__(self, primary: Redis, replica: Redis | None = None):
self.primary = primary
self.replica = replica or primary

async def get(self, key: str) -> bytes | None:
"""Get the value associated with the key from Redis. Returns `None` if the key isn't in
Expand All @@ -26,7 +62,7 @@ async def get(self, key: str) -> bytes | None:
- `CacheAdapterError` if Redis returns an error.
"""
try:
return await self.redis.get(key)
return await self.replica.get(key)
except RedisError as exc:
raise CacheAdapterError(f"Failed to get `{repr(key)}` with error: `{exc}`") from exc

Expand All @@ -43,14 +79,18 @@ async def set(
- `CacheAdapterError` if Redis returns an error.
"""
try:
await self.redis.set(key, value, ex=ttl.days * 86400 + ttl.seconds if ttl else None)
await self.primary.set(key, value, ex=ttl.days * 86400 + ttl.seconds if ttl else None)
except RedisError as exc:
raise CacheAdapterError(f"Failed to set `{repr(key)}` with error: `{exc}`") from exc

async def close(self) -> None:
"""Close the Redis connection."""
# "type: ignore" was added to suppress a false alarm.
await self.redis.aclose() # type: ignore
if self.primary is self.replica:
# "type: ignore" was added to suppress a false alarm.
await self.primary.aclose() # type: ignore
else:
await self.primary.aclose() # type: ignore
await self.replica.aclose() # type: ignore

def register_script(self, sid: str, script: str) -> None:
"""Register a Lua script in Redis. Regist multiple scripts using the same `sid`
Expand All @@ -62,23 +102,27 @@ def register_script(self, sid: str, script: str) -> None:
- `sid` {str}, a script identifier
- `script` {str}, a Redis supported Lua script
"""
self.scripts[sid] = self.redis.register_script(script)
self.scripts[sid] = self.primary.register_script(script)

async def run_script(self, sid: str, keys: list[str], args: list[str]) -> Any:
async def run_script(
self, sid: str, keys: list[str], args: list[str], readonly: bool = False
) -> Any:
"""Run a given script with keys and arguments.

Params:
- `sid` {str}, a script identifier
- `keys` list[str], a list of keys used as the global `KEYS` in Redis scripting
- `args` list[str], a list of arguments used as the global `ARGV` in Redis scripting
- `readonly` bool, whether or not the script is readonly. Readonly scripts will be run on replica servers.
Returns:
A Redis value based on the return value of the specified script
Raises:
- `CacheAdapterError` if Redis returns an error
- `KeyError` if `sid` does not have a script associated
"""
try:
res = await self.scripts[sid](keys, args)
# Run the script in the replica nodes if it's readonly.
res = await self.scripts[sid](keys, args, self.replica if readonly else self.primary)
except RedisError as exc:
raise CacheAdapterError(f"Failed to run script {id} with error: `{exc}`") from exc

Expand Down
19 changes: 14 additions & 5 deletions merino/configs/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ dry_run = false
# Default score to set in suggestions uploaded to remote settings.
score = 0.25

# Global Redis settings. The weather provider optionally uses Redis to cache weather suggestions.
[default.redis]
# MERINO_REDIS__SERVER - URI to the Redis primary endpoint.
# In the form of `redis://localhost:6379`.
server = "redis://localhost:6379"

# MERINO_REDIS__REPLICA - URI to the Redis replica endpoint.
# Set to the primary endpoint for development and testing environemnts.
replica = "redis://localhost:6379"

# MERINO_REDIS__MAX_CONNECTIONS - The maximum connections allowed for the connection pool.
# To override the default max_conns of Redis-py: 2 ** 31.
max_connections = 500


[default.sentry]
# MERINO_SENTRY__MODE
Expand Down Expand Up @@ -254,11 +268,6 @@ query_timeout_sec = 5.0
# accuweather backend http client to establish a connection to the host.
connect_timeout_sec = 3.0


# MERINO_REDIS__SERVER - redis.server (Currently not configured here)
# Global Redis settings. The weather provider optionally uses Redis to cache weather suggestions.
# In the form of `redis://localhost:6379`.

# MERINO_PROVIDERS__ACCUWEATHER__CRON_INTERVAL_SEC
cron_interval_sec = 21600 # 6 hours

Expand Down
11 changes: 8 additions & 3 deletions merino/providers/suggest/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
from enum import Enum, unique

from dynaconf.base import Settings
from redis.asyncio import Redis

from merino.cache.none import NoCacheAdapter
from merino.cache.redis import RedisAdapter
from merino.cache.redis import RedisAdapter, create_redis_clients
from merino.configs import settings
from merino.exceptions import InvalidProviderError
from merino.utils.metrics import get_metrics_client
Expand Down Expand Up @@ -53,7 +52,13 @@ def _create_provider(provider_id: str, setting: Settings) -> BaseProvider:
match setting.type:
case ProviderType.ACCUWEATHER:
cache = (
RedisAdapter(Redis.from_url(settings.redis.server))
RedisAdapter(
*create_redis_clients(
settings.redis.server,
settings.redis.replica,
settings.redis.max_connections,
)
)
if setting.cache == "redis"
else NoCacheAdapter()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ async def get_weather_report_with_location_key(
location_key=location_key
),
],
readonly=True,
)
if cached_data:
cached_data = [LOCATION_SENTINEL, *cached_data]
Expand Down Expand Up @@ -575,6 +576,7 @@ async def _fetch_from_cache(
self.cache_key_template(WeatherDataType.FORECAST, language),
self.url_location_key_placeholder,
],
readonly=True,
)
return cached_data if cached_data else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ async def mock_get(key) -> Any:
async def mock_set(key, value, **kwargs) -> Any:
return None

async def script_callable(keys, args) -> list:
async def script_callable(keys, args, readonly) -> list:
return []

def mock_register_script(script) -> Callable[[list, list], Awaitable[list]]:
def mock_register_script(script) -> Callable[[list, list, bool], Awaitable[list]]:
return script_callable

mock = mocker.AsyncMock(spec=Redis)
Expand Down Expand Up @@ -1298,10 +1298,10 @@ async def test_get_weather_report_with_cache_fetch_error(

redis_mock = mocker.AsyncMock(spec=Redis)

async def script_callable(keys, args) -> list:
async def script_callable(keys, args, readonly) -> list:
raise RedisError("Failed to fetch")

def mock_register_script(script) -> Callable[[list, list], Awaitable[list]]:
def mock_register_script(script) -> Callable[[list, list, bool], Awaitable[list]]:
return script_callable

redis_mock.register_script.side_effect = mock_register_script
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/utils/test_cache_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""Unit tests for the cron.py module."""

import pytest
from pytest_mock import MockerFixture

from redis.asyncio import Redis

from merino.cache.redis import create_redis_clients, RedisAdapter


@pytest.mark.asyncio
async def test_adapter_in_standalone_mode(mocker: MockerFixture) -> None:
"""Test `RedisAdapter` for the standalone mode."""
spy = mocker.spy(Redis, "aclose")
adapter: RedisAdapter = RedisAdapter(
*create_redis_clients(
primary="redis://localhost:6379",
replica="redis://localhost:6379",
max_connections=1,
db=0,
)
)

assert adapter.primary is adapter.replica

await adapter.close()

spy.assert_called_once()