From c5016cad2e3d2bb7ffd855bbfdd9fb432cffc001 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 28 Mar 2025 07:38:58 -0400 Subject: [PATCH] Make schema agreement waiting code renew connection on each iteration When schema agreement is started it could happen that control connection is getting disconnected/reconnected, when it happens schema agreement code used to use disconnected connection to run all the queries. As result, it could lead to schema agreement timeout, even if all nodes got schema updated long time ago. This commit updates connection on every iteration and makes it iterate when underlying connection is closed --- cassandra/cluster.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fd73803eb8..94caa4166d 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai if self._is_shutdown: return - if not connection: - connection = self._connection + current_connection = connection or self._connection if preloaded_results: log.debug("[control connection] Attempting to use preloaded results for schema agreement") peers_result = preloaded_results[0] local_result = preloaded_results[1] - schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) + schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint) if schema_mismatches is None: return True @@ -4237,16 +4236,27 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai elapsed = 0 cl = ConsistencyLevel.ONE schema_mismatches = None - select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection) + select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection) + error_signaled = False while elapsed < total_timeout: + if current_connection != connection or self._connection: + current_connection = connection or self._connection + error_signaled = False + + if current_connection.is_defunct or current_connection.is_closed: + log.debug("[control connection] connection is closed, wait and trying again") + self._time.sleep(0.2) + elapsed = self._time.time() - start + continue + peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout), consistency_level=cl) local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout), consistency_level=cl) try: timeout = min(self._timeout, total_timeout - elapsed) - peers_result, local_result = connection.wait_for_responses( + peers_result, local_result = current_connection.wait_for_responses( peers_query, local_query, timeout=timeout) except OperationTimedOut as timeout: log.debug("[control connection] Timed out waiting for " @@ -4257,10 +4267,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai if self._is_shutdown: log.debug("[control connection] Aborting wait for schema match due to shutdown") return None - else: - raise + elif not error_signaled: + self._signal_error() + error_signaled = True + continue - schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint) + schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint) if schema_mismatches is None: return True @@ -4269,7 +4281,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai elapsed = self._time.time() - start log.warning("Node %s is reporting a schema disagreement: %s", - connection.endpoint, schema_mismatches) + current_connection.endpoint, schema_mismatches) return False def _get_schema_mismatches(self, peers_result, local_result, local_address):