-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Hitless upgrade support implementation for synchronous Redis client. #3713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
0304da5
to
ee27bd2
Compare
… tests for maintenance_events.py file
…tion pool - this should be a separate PR
… Refactored the maintenance events tests not to be multithreaded - we don't need it for those tests.
…ot processed in in Moving state. Tests are updated
…ply them during connect
ee27bd2
to
0f9734c
Compare
0f9734c
to
6d496f0
Compare
redis/connection.py
Outdated
@@ -1646,6 +1815,154 @@ def re_auth_callback(self, token: TokenInterface): | |||
for conn in self._in_use_connections: | |||
conn.set_re_auth_token(token) | |||
|
|||
def update_connection_kwargs_with_tmp_settings( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make it more generic? Something like update_connection_kwargs(**kwargs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
redis/connection.py
Outdated
self.connection_kwargs.update({"tmp_host_address": tmp_host_address}) | ||
self.connection_kwargs.update({"tmp_relax_timeout": tmp_relax_timeout}) | ||
|
||
def update_connections_tmp_settings( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here, could it be more generic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see why I didn't understand the next comment :) This is an old function that has been removed and split into two separate functions.
I'll see how will be appropriate to combine some of the into more generic ones or which of the newly added ones can be transformed into more generic ones(like the one mentioned above --> update_connection_kwargs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the code, and now there are two more generic methods: update_connection_kwargs
and update_connections_settings
, that can be further extended if needed.
redis/connection.py
Outdated
conn, tmp_host_address, tmp_relax_timeout | ||
) | ||
|
||
def update_connections_current_timeout( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be not needed if update_connections_tmp_settings
will be more generic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladvildanov I didn’t quite understand this one — do you mean having a single generic method that handles all possible updates to object properties, including the socket timeout update for already connected connections?
I intentionally separated them to make it easier to follow what exactly gets updated during notification handling — specifically, whether we’re only updating properties used during connect
, or actually changing the behavior of already established socket connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
redis/connection.py
Outdated
connections_in_queue = {conn for conn in self.pool.queue if conn} | ||
for conn in self._connections: | ||
if conn not in connections_in_queue: | ||
if address_type_to_match == "connected": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of duplicate of this part:
if address_type_to_match == "connected":
if matching_address and conn.getpeername() != matching_address:
continue
Maybe it makes to create a method that filter connections by this criteria and returns filtered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to iterate over the different lists once and not create additional lists holding references to the connection objects that match the filter, so I think it will be better to create a new helper method that just checks if the connection should be updated or not. You are right - it is a lot of code duplication :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements hitless upgrade support for the synchronous Redis client by adding maintenance events handling. The implementation allows the Redis client to gracefully handle cluster rebalancing and node migration operations without losing connections or data.
Key changes:
- Added comprehensive maintenance events handling infrastructure for MOVING, MIGRATING, and MIGRATED events
- Implemented connection pool management for hitless upgrades during cluster maintenance
- Enhanced Redis client with maintenance events configuration and handlers
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
redis/maintenance_events.py | Core maintenance events classes and handlers for processing cluster rebalancing notifications |
redis/connection.py | Enhanced connection and pool classes with maintenance state management and proactive reconnection support |
redis/client.py | Updated Redis client to support maintenance events configuration and handle connection reconnection during maintenance |
redis/_parsers/base.py | Extended parsers to handle maintenance-related push notifications (MOVING, MIGRATING, MIGRATED) |
redis/_parsers/resp3.py | Updated RESP3 parser to properly handle push notifications during maintenance events |
redis/_parsers/hiredis.py | Updated Hiredis parser to support maintenance event push notifications |
tests/test_maintenance_events_handling.py | Comprehensive integration tests for maintenance events handling with mocked Redis protocol |
tests/test_maintenance_events.py | Unit tests for maintenance event classes and handlers |
tests/test_connection_pool.py | Minor test updates to support new connection pool functionality |
Comments suppressed due to low confidence (2)
redis/connection.py:385
- The parser is being set before protocol-specific configurations are applied. Line 407 shows that RESP3Parser is set when protocol is 3, but the initial parser setup happens before this check. This could lead to incorrect parser being used for maintenance events.
self.health_check_interval = health_check_interval
redis/connection.py:381
- This line appears to be indented incorrectly. The if statement at line 380 should control this block, but the indentation suggests it's not properly nested within the conditional block.
# Update the retry's supported errors with the specified errors
# Format: >3\r\n$6\r\nMOVING\r\n:15\r\n+localhost:6379\r\n (3 elements: MOVING, ttl, host:port) | ||
# Note: Using + instead of $ to send as simple string instead of bulk string | ||
moving_push = f">3\r\n$6\r\nMOVING\r\n:{MOVING_TIMEOUT}\r\n+{AFTER_MOVING_ADDRESS}\r\n" | ||
response = moving_push.encode() + response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The RESP3 protocol format is hardcoded with escape sequences. Consider using a constant or helper function to construct these protocol messages to improve readability and maintainability.
response = moving_push.encode() + response | |
migrating_push = make_resp3_push_message([ | |
("bulk", "MIGRATING"), | |
("int", "10"), | |
]) | |
response = migrating_push + response | |
elif b"key_receive_migrated_" in data or b"key_receive_migrated" in data: | |
# MIGRATED push message before SET key_receive_migrated_X response | |
# Format: >1\r\n$8\r\nMIGRATED\r\n (1 element: MIGRATED) | |
migrated_push = make_resp3_push_message([ | |
("bulk", "MIGRATED"), | |
]) | |
response = migrated_push + response | |
elif b"key_receive_moving_" in data: | |
# MOVING push message before SET key_receive_moving_X response | |
# Format: >3\r\n$6\r\nMOVING\r\n:15\r\n+localhost:6379\r\n (3 elements: MOVING, ttl, host:port) | |
# Note: Using + instead of $ to send as simple string instead of bulk string | |
moving_push = make_resp3_push_message([ | |
("bulk", "MOVING"), | |
("int", str(MOVING_TIMEOUT)), | |
("simple", AFTER_MOVING_ADDRESS), | |
]) | |
response = moving_push + response |
Copilot uses AI. Check for mistakes.
When this method is called the pool will already be locked, so getting the pool lock inside is not needed. | ||
|
||
:param orig_host_address: The temporary host address to use for the connection. | ||
:param orig_relax_timeout: The relax timeout to use for the connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter documentation is incorrect. The parameter name is 'tmp_host_address' but the docstring refers to 'orig_host_address'. This inconsistency could confuse developers.
:param orig_relax_timeout: The relax timeout to use for the connection. | |
:param tmp_host_address: The temporary host address to use for the connection. | |
:param tmp_relax_timeout: The relax timeout to use for the connection. |
Copilot uses AI. Check for mistakes.
When this method is called the pool will already be locked, so getting the pool lock inside is not needed. | ||
|
||
:param orig_host_address: The temporary host address to use for the connection. | ||
:param orig_relax_timeout: The relax timeout to use for the connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter documentation is incorrect. The parameter name is 'tmp_relax_timeout' but the docstring refers to 'orig_relax_timeout'. This inconsistency could confuse developers.
:param orig_relax_timeout: The relax timeout to use for the connection. | |
:param tmp_host_address: The temporary host address to use for the connection. | |
:param tmp_relax_timeout: The relax timeout to use for the connection. |
Copilot uses AI. Check for mistakes.
Pull Request check-list
Please make sure to review and check all of these items:
NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
Hitless upgrade support implementation for synchronous Redis client.