Skip to content

Added MultiDbClient support with OSS Cluster API #3734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 58 commits into
base: feat/active-active
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ac86280
Added Database, Healthcheck, CircuitBreaker, FailureDetector
vladvildanov Jun 13, 2025
4f4a53c
Added DatabaseSelector, exceptions, refactored existing entities
vladvildanov Jun 17, 2025
acc68ef
Added MultiDbConfig
vladvildanov Jun 17, 2025
255bb0e
Added DatabaseConfig
vladvildanov Jun 17, 2025
79db257
Added DatabaseConfig test coverage
vladvildanov Jun 17, 2025
8790db1
Renamed DatabaseSelector into FailoverStrategy
vladvildanov Jun 18, 2025
b3ad8da
Added CommandExecutor
vladvildanov Jun 18, 2025
3a1dc9c
Updated healthcheck to close circuit on success
vladvildanov Jun 18, 2025
9bb9235
Added thread-safeness
vladvildanov Jun 19, 2025
3218e36
Added missing thread-safeness
vladvildanov Jun 19, 2025
4cdb6f4
Added missing thread-safenes for dispatcher
vladvildanov Jun 19, 2025
6914467
Refactored client to keep databases in WeightedList
vladvildanov Jun 19, 2025
5b94757
Added database CRUD operations
vladvildanov Jun 26, 2025
daba501
Added on-fly configuration
vladvildanov Jun 26, 2025
061e518
Added background health checks
vladvildanov Jun 27, 2025
a562774
Added background healthcheck + half-open event
vladvildanov Jul 2, 2025
3ab1367
Refactored background scheduling
vladvildanov Jul 3, 2025
3a55dcd
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Jul 4, 2025
46afaea
Added support for Active-Active pipeline
vladvildanov Jul 4, 2025
badef0e
Refactored healthchecks
vladvildanov Jul 7, 2025
0cdeebf
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 7, 2025
f16b646
Added Pipeline testing
vladvildanov Jul 7, 2025
7e43b40
Added support for transactions
vladvildanov Jul 14, 2025
fcc6035
Removed code repetitions, fixed weight assignment, added loops enhanc…
vladvildanov Jul 15, 2025
7e815ad
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 15, 2025
0563024
Added missing doc blocks
vladvildanov Jul 15, 2025
f64e10d
Added support for Pub/Sub in MultiDBClient
vladvildanov Jul 17, 2025
d5dc65c
Refactored configuration
vladvildanov Jul 17, 2025
7086822
Refactored failure detector
vladvildanov Jul 18, 2025
2561d6f
Refactored retry logic
vladvildanov Jul 18, 2025
6b0689a
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 18, 2025
e15a38b
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 18, 2025
a0af5b3
Added scenario tests
vladvildanov Jul 24, 2025
aaed8d7
Added pybreaker optional dependency
vladvildanov Jul 24, 2025
0551618
Added pybreaker to dev dependencies
vladvildanov Jul 24, 2025
1d288e6
Rename tests directory
vladvildanov Jul 24, 2025
0c644f2
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 24, 2025
8922aa8
Added scenario tests for Pipeline and Transaction
vladvildanov Jul 24, 2025
94eff21
Added handling of ConnectionRefusedError, added timeouts so cluster c…
vladvildanov Jul 24, 2025
7fa7c07
Increased timeouts
vladvildanov Jul 24, 2025
2cb8cac
Refactored integration tests
vladvildanov Jul 25, 2025
e76aea3
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 29, 2025
2de5d09
Added scenario tests for Pub/Sub
vladvildanov Jul 30, 2025
a7f03c0
Updated healthcheck retry
vladvildanov Jul 30, 2025
0505e0a
Increased timeout to avoid unprepared state before tests
vladvildanov Jul 30, 2025
a7a7b6d
Added backoff retry and changed timeouts
vladvildanov Jul 31, 2025
faa18ae
Added retry for healthchecks to avoid fluctuations
vladvildanov Jul 31, 2025
7d5e957
Changed retry configuration for healthchecks
vladvildanov Jul 31, 2025
ed93cfc
Fixed property name
vladvildanov Jul 31, 2025
bee15d9
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 31, 2025
b9d727e
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Aug 11, 2025
897dadb
Added check for thread results
vladvildanov Aug 11, 2025
c9dbec5
Added MultiDbClient support with OSS Cluster API
vladvildanov Aug 12, 2025
f92243c
Removed database statuses
vladvildanov Aug 12, 2025
dda4005
Increased test timeouts
vladvildanov Aug 12, 2025
829cc43
Increased retry timeout
vladvildanov Aug 12, 2025
8a4ba4f
Increased timeout retries
vladvildanov Aug 12, 2025
6ba09ca
Updated base threshold for retries
vladvildanov Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ def run_in_thread(
sleep_time: float = 0.0,
daemon: bool = False,
exception_handler: Optional[Callable] = None,
pubsub = None,
sharded_pubsub: bool = False,
) -> "PubSubWorkerThread":
for channel, handler in self.channels.items():
if handler is None:
Expand All @@ -1230,8 +1232,9 @@ def run_in_thread(
f"Shard Channel: '{s_channel}' has no handler registered"
)

pubsub = self if pubsub is None else pubsub
thread = PubSubWorkerThread(
self, sleep_time, daemon=daemon, exception_handler=exception_handler
pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler, sharded_pubsub=sharded_pubsub
)
thread.start()
return thread
Expand All @@ -1246,12 +1249,14 @@ def __init__(
exception_handler: Union[
Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None
] = None,
sharded_pubsub: bool = False,
):
super().__init__()
self.daemon = daemon
self.pubsub = pubsub
self.sleep_time = sleep_time
self.exception_handler = exception_handler
self.sharded_pubsub = sharded_pubsub
self._running = threading.Event()

def run(self) -> None:
Expand All @@ -1262,7 +1267,10 @@ def run(self) -> None:
sleep_time = self.sleep_time
while self._running.is_set():
try:
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
if not self.sharded_pubsub:
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
else:
pubsub.get_sharded_message(ignore_subscribe_messages=True, timeout=sleep_time)
except BaseException as e:
if self.exception_handler is None:
raise
Expand Down
3 changes: 2 additions & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3154,7 +3154,8 @@ def _reinitialize_on_error(self, error):
self._nodes_manager.initialize()
self.reinitialize_counter = 0
else:
self._nodes_manager.update_moved_exception(error)
if type(error) == MovedError:
self._nodes_manager.update_moved_exception(error)

self._executing = False

Expand Down
158 changes: 145 additions & 13 deletions redis/multidb/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import threading
import socket
from typing import List, Any, Callable
from typing import List, Any, Callable, Optional

from redis.background import BackgroundScheduler
from redis.client import PubSubWorkerThread
from redis.exceptions import ConnectionError, TimeoutError
from redis.commands import RedisModuleCommands, CoreCommands
from redis.multidb.command_executor import DefaultCommandExecutor
from redis.multidb.config import MultiDbConfig, DEFAULT_GRACE_PERIOD
from redis.multidb.circuit import State as CBState, CircuitBreaker
from redis.multidb.database import State as DBState, Database, AbstractDatabase, Databases
from redis.multidb.database import Database, AbstractDatabase, Databases
from redis.multidb.exception import NoValidDatabaseException
from redis.multidb.failure_detector import FailureDetector
from redis.multidb.healthcheck import HealthCheck
Expand Down Expand Up @@ -70,13 +71,8 @@ def raise_exception_on_failed_hc(error):

# Set states according to a weights and circuit state
if database.circuit.state == CBState.CLOSED and not is_active_db_found:
database.state = DBState.ACTIVE
self.command_executor.active_database = database
is_active_db_found = True
elif database.circuit.state == CBState.CLOSED and is_active_db_found:
database.state = DBState.PASSIVE
else:
database.state = DBState.DISCONNECTED

if not is_active_db_found:
raise NoValidDatabaseException('Initial connection failed - no active database found')
Expand Down Expand Up @@ -107,8 +103,6 @@ def set_active_database(self, database: AbstractDatabase) -> None:

if database.circuit.state == CBState.CLOSED:
highest_weighted_db, _ = self._databases.get_top_n(1)[0]
highest_weighted_db.state = DBState.PASSIVE
database.state = DBState.ACTIVE
self.command_executor.active_database = database
return

Expand All @@ -130,9 +124,7 @@ def add_database(self, database: AbstractDatabase):

def _change_active_database(self, new_database: AbstractDatabase, highest_weight_database: AbstractDatabase):
if new_database.weight > highest_weight_database.weight and new_database.circuit.state == CBState.CLOSED:
new_database.state = DBState.ACTIVE
self.command_executor.active_database = new_database
highest_weight_database.state = DBState.PASSIVE

def remove_database(self, database: Database):
"""
Expand All @@ -142,7 +134,6 @@ def remove_database(self, database: Database):
highest_weighted_db, highest_weight = self._databases.get_top_n(1)[0]

if highest_weight <= weight and highest_weighted_db.circuit.state == CBState.CLOSED:
highest_weighted_db.state = DBState.ACTIVE
self.command_executor.active_database = highest_weighted_db

def update_database_weight(self, database: AbstractDatabase, weight: float):
Expand Down Expand Up @@ -201,6 +192,17 @@ def transaction(self, func: Callable[["Pipeline"], None], *watches, **options):

return self.command_executor.execute_transaction(func, *watches, *options)

def pubsub(self, **kwargs):
"""
Return a Publish/Subscribe object. With this object, you can
subscribe to channels and listen for messages that get published to
them.
"""
if not self.initialized:
self.initialize()

return PubSub(self, **kwargs)

def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Exception], None] = None) -> None:
"""
Runs health checks on the given database until first failure.
Expand All @@ -221,7 +223,7 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep
database.circuit.state = CBState.OPEN
elif is_healthy and database.circuit.state != CBState.CLOSED:
database.circuit.state = CBState.CLOSED
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError) as e:
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError, ValueError) as e:
if database.circuit.state != CBState.OPEN:
database.circuit.state = CBState.OPEN
is_healthy = False
Expand Down Expand Up @@ -311,3 +313,133 @@ def execute(self) -> List[Any]:
return self._client.command_executor.execute_pipeline(tuple(self._command_stack))
finally:
self.reset()

class PubSub:
"""
PubSub object for multi database client.
"""
def __init__(self, client: MultiDBClient, **kwargs):
self._client = client
self._client.command_executor.pubsub(**kwargs)

def __enter__(self) -> "PubSub":
return self

def __exit__(self, exc_type, exc_value, traceback) -> None:
self.reset()

def __del__(self) -> None:
try:
# if this object went out of scope prior to shutting down
# subscriptions, close the connection manually before
# returning it to the connection pool
self.reset()
except Exception:
pass

def reset(self) -> None:
pass

def close(self) -> None:
self.reset()

@property
def subscribed(self) -> bool:
return self._client.command_executor.active_pubsub.subscribed

def psubscribe(self, *args, **kwargs):
"""
Subscribe to channel patterns. Patterns supplied as keyword arguments
expect a pattern name as the key and a callable as the value. A
pattern's callable will be invoked automatically when a message is
received on that pattern rather than producing a message via
``listen()``.
"""
return self._client.command_executor.execute_pubsub_method('psubscribe', *args, **kwargs)

def punsubscribe(self, *args):
"""
Unsubscribe from the supplied patterns. If empty, unsubscribe from
all patterns.
"""
return self._client.command_executor.execute_pubsub_method('punsubscribe', *args)

def subscribe(self, *args, **kwargs):
"""
Subscribe to channels. Channels supplied as keyword arguments expect
a channel name as the key and a callable as the value. A channel's
callable will be invoked automatically when a message is received on
that channel rather than producing a message via ``listen()`` or
``get_message()``.
"""
return self._client.command_executor.execute_pubsub_method('subscribe', *args, **kwargs)

def unsubscribe(self, *args):
"""
Unsubscribe from the supplied channels. If empty, unsubscribe from
all channels
"""
return self._client.command_executor.execute_pubsub_method('unsubscribe', *args)

def ssubscribe(self, *args, **kwargs):
"""
Subscribes the client to the specified shard channels.
Channels supplied as keyword arguments expect a channel name as the key
and a callable as the value. A channel's callable will be invoked automatically
when a message is received on that channel rather than producing a message via
``listen()`` or ``get_sharded_message()``.
"""
return self._client.command_executor.execute_pubsub_method('ssubscribe', *args, **kwargs)

def sunsubscribe(self, *args):
"""
Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
all shard_channels
"""
return self._client.command_executor.execute_pubsub_method('sunsubscribe', *args)

def get_message(
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
):
"""
Get the next message if one is available, otherwise None.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number, or None, to wait indefinitely.
"""
return self._client.command_executor.execute_pubsub_method(
'get_message',
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
)

def get_sharded_message(
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
):
"""
Get the next message if one is available in a sharded channel, otherwise None.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number, or None, to wait indefinitely.
"""
return self._client.command_executor.execute_pubsub_method(
'get_sharded_message',
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
)

def run_in_thread(
self,
sleep_time: float = 0.0,
daemon: bool = False,
exception_handler: Optional[Callable] = None,
sharded_pubsub: bool = False,
) -> "PubSubWorkerThread":
return self._client.command_executor.execute_pubsub_run_in_thread(
sleep_time=sleep_time,
daemon=daemon,
exception_handler=exception_handler,
pubsub=self,
sharded_pubsub=sharded_pubsub,
)

Loading