Skip to content

Commit

Permalink
[ring_hash] update proactive connection attempt logic
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Feb 14, 2025
1 parent 822f9b1 commit 6c4c773
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 156 deletions.
108 changes: 36 additions & 72 deletions src/core/load_balancing/ring_hash/ring_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,10 @@ class RingHash final : public LoadBalancingPolicy {
void ShutdownLocked() override;

// Updates the aggregate policy's connectivity state based on the
// endpoint list's state counters, creating a new picker.
// entered_transient_failure is true if the endpoint has just
// entered TRANSIENT_FAILURE state.
// number of endpoints in each state, creating a new picker.
// If the call to this method is triggered by an endpoint entering
// TRANSIENT_FAILURE, then status is the status reported by the endpoint.
void UpdateAggregatedConnectivityStateLocked(bool entered_transient_failure,
absl::Status status);
void UpdateAggregatedConnectivityStateLocked(absl::Status status);

// Current endpoint list, channel args, and ring.
EndpointAddressesList endpoints_;
Expand Down Expand Up @@ -661,15 +658,11 @@ void RingHash::RingHashEndpoint::OnStateUpdate(
<< ")";
if (child_policy_ == nullptr) return; // Already orphaned.
// Update state.
const bool entered_transient_failure =
connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE;
connectivity_state_ = new_state;
status_ = status;
picker_ = std::move(picker);
// Update the aggregated connectivity state.
ring_hash_->UpdateAggregatedConnectivityStateLocked(entered_transient_failure,
status);
ring_hash_->UpdateAggregatedConnectivityStateLocked(status);
}

//
Expand Down Expand Up @@ -776,29 +769,29 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
return status;
}
// Return a new picker.
UpdateAggregatedConnectivityStateLocked(/*entered_transient_failure=*/false,
absl::OkStatus());
UpdateAggregatedConnectivityStateLocked(absl::OkStatus());
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

void RingHash::UpdateAggregatedConnectivityStateLocked(
bool entered_transient_failure, absl::Status status) {
void RingHash::UpdateAggregatedConnectivityStateLocked(absl::Status status) {
// Count the number of endpoints in each state.
size_t num_idle = 0;
size_t num_connecting = 0;
size_t num_ready = 0;
size_t num_transient_failure = 0;
RingHashEndpoint* idle_endpoint = nullptr;
for (const auto& [_, endpoint] : endpoint_map_) {
switch (endpoint->connectivity_state()) {
case GRPC_CHANNEL_READY:
++num_ready;
break;
case GRPC_CHANNEL_IDLE:
++num_idle;
if (idle_endpoint == nullptr) idle_endpoint = endpoint.get();
break;
case GRPC_CHANNEL_CONNECTING:
++num_connecting;
Expand All @@ -820,33 +813,26 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
// more than one endpoint, report CONNECTING.
// 5. If there is at least one endpoint in IDLE state, report IDLE.
// 6. Otherwise, report TRANSIENT_FAILURE.
//
// We set start_connection_attempt to true if we match rules 2, 4, or 6.
grpc_connectivity_state state;
bool start_connection_attempt = false;
if (num_ready > 0) {
state = GRPC_CHANNEL_READY;
} else if (num_transient_failure >= 2) {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
start_connection_attempt = true;
} else if (num_connecting > 0) {
state = GRPC_CHANNEL_CONNECTING;
} else if (num_transient_failure == 1 && endpoints_.size() > 1) {
state = GRPC_CHANNEL_CONNECTING;
start_connection_attempt = true;
} else if (num_idle > 0) {
state = GRPC_CHANNEL_IDLE;
} else {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
start_connection_attempt = true;
}
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this << "] setting connectivity state to "
<< ConnectivityStateName(state) << " (num_idle=" << num_idle
<< ", num_connecting=" << num_connecting << ", num_ready=" << num_ready
<< ", num_transient_failure=" << num_transient_failure
<< ", size=" << endpoints_.size()
<< ") -- start_connection_attempt=" << start_connection_attempt;
<< ", size=" << endpoints_.size() << ")";
// In TRANSIENT_FAILURE, report the last reported failure.
// Otherwise, report OK.
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
Expand All @@ -864,24 +850,25 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
state, status,
MakeRefCounted<Picker>(
RefAsSubclass<RingHash>(DEBUG_LOCATION, "RingHashPicker")));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to endpoints unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one endpoint at any given time. But we don't want to just
// try to connect to only one endpoint, because if that particular
// endpoint happens to be down but the rest are reachable, we would
// incorrectly fail to recover.
// The ring_hash policy normally triggers endpoint connection attempts
// from the picker. However, if it is being used as a child of the
// priority policy, it will not be getting any picks once it reports
// TRANSIENT_FAILURE, and in some cases even when it reports CONNECTING,
// due to the failover timer in the priority policy. Because it reports
// TRANSIENT_FAILURE when only two endpoints are failing (aggregation
// rule 2 above) and CONNECTING when only one endpoint is reporting
// TRANSIENT_FAILURE (aggregation rule 4 above), this means that the
// priority policy could fail over to the next priority when the
// ring_hash policy is only attempting a small number of endpoints.
// This would effectively cause us to assume that all of the ring_hash
// endpoints are unreachable when in fact only a small number of them
// are, and we would never try any of the others, thus never
// recovering from that incorrect assumption.
//
// So, to handle this, whenever an endpoint initially enters
// TRANSIENT_FAILURE state (i.e., its initial connection attempt has
// failed), if there are no endpoints currently in CONNECTING state
// (i.e., they are still trying their initial connection attempt),
// then we will trigger a connection attempt for the first endpoint
// that is currently in state IDLE, if any.
// To work around this, when the aggregated connectivity state is
// either TRANSIENT_FAILURE or CONNECTING, if we do not have at least
// one CONNECTING endpoint but we have at least one IDLE endpoint,
// then we trigger a connection attempt on one of the IDLE endpoints.
//
// Note that once an endpoint enters TRANSIENT_FAILURE state, it will
// stay in that state and automatically retry after appropriate backoff,
Expand All @@ -892,39 +879,16 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
// LB policy and we keep getting picks, so it's not really a new
// problem. If/when it becomes an issue, we can figure out how to
// address it.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt && entered_transient_failure) {
size_t first_idle_index = endpoints_.size();
for (size_t i = 0; i < endpoints_.size(); ++i) {
auto it =
endpoint_map_.find(EndpointAddressSet(endpoints_[i].addresses()));
CHECK(it != endpoint_map_.end());
auto& endpoint = it->second;
if (endpoint->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
first_idle_index = endpoints_.size();
break;
}
if (first_idle_index == endpoints_.size() &&
endpoint->connectivity_state() == GRPC_CHANNEL_IDLE) {
first_idle_index = i;
}
}
if (first_idle_index != endpoints_.size()) {
auto it = endpoint_map_.find(
EndpointAddressSet(endpoints_[first_idle_index].addresses()));
CHECK(it != endpoint_map_.end());
auto& endpoint = it->second;
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this
<< "] triggering internal connection attempt for endpoint "
<< endpoint.get() << " (" << endpoints_[first_idle_index].ToString()
<< ") (index " << first_idle_index << " of " << endpoints_.size()
<< ")";
endpoint->RequestConnectionLocked();
}
if ((state == GRPC_CHANNEL_CONNECTING ||
state == GRPC_CHANNEL_TRANSIENT_FAILURE) &&
num_connecting == 0 && idle_endpoint != nullptr) {
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this
<< "] triggering internal connection attempt for endpoint "
<< idle_endpoint << " ("
<< endpoints_[idle_endpoint->index()].ToString() << ") (index "
<< idle_endpoint->index() << " of " << endpoints_.size() << ")";
idle_endpoint->RequestConnectionLocked();
}
}

Expand Down
132 changes: 132 additions & 0 deletions test/core/load_balancing/ring_hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,138 @@ TEST_F(RingHashTest, MultipleAddressesPerEndpoint) {
EXPECT_EQ(address, kEndpoint1Addresses[1]);
}

TEST_F(RingHashTest,
TriggersConnectionAttemptsInConnectingAndTransientFailureWithoutPicks) {
const std::array<absl::string_view, 4> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443",
"ipv4:127.0.0.1:444"};
std::array<SubchannelState*, 4> subchannels;
for (size_t i = 0; i < subchannels.size(); ++i) {
subchannels[i] = CreateSubchannel(kAddresses[i]);
}
absl::flat_hash_set<SubchannelState*> failed_subchannels;
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
absl::OkStatus());
auto picker = ExpectState(GRPC_CHANNEL_IDLE);
// Do a pick for subchannel 0. This will trigger a connection attempt,
// which will fail.
auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
ExpectPickQueued(picker.get(), {address0_attribute});
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannels[0]->ConnectionRequested());
for (size_t i = 1; i < subchannels.size(); ++i) {
EXPECT_FALSE(subchannels[i]->ConnectionRequested()) << "index " << i;
}
subchannels[0]->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
for (size_t i = 0; i < subchannels.size(); ++i) {
EXPECT_FALSE(subchannels[i]->ConnectionRequested()) << "index " << i;
}
subchannels[0]->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("connection attempt failed"));
failed_subchannels.insert(subchannels[0]);
// With one subchannel in state TF and the rest in IDLE, we report
// CONNECTING. This should automatically trigger a connection attempt
// on exactly one other subchannel, even without picks.
ExpectReresolutionRequest();
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
SubchannelState* connecting_subchannel = nullptr;
for (size_t i = 0; i < subchannels.size(); ++i) {
if (subchannels[i]->ConnectionRequested()) {
ASSERT_EQ(connecting_subchannel, nullptr) << "index " << i;
connecting_subchannel = subchannels[i];
}
}
ASSERT_NE(connecting_subchannel, nullptr);
// This subchannel will also fail to connect.
connecting_subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
for (size_t i = 0; i < subchannels.size(); ++i) {
EXPECT_FALSE(subchannels[i]->ConnectionRequested());
}
connecting_subchannel->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("connection attempt failed"));
failed_subchannels.insert(connecting_subchannel);
// Now that there are two subchannels in TF, the policy will report TF
// to the channel. It will also trigger a connection attempt on exactly
// one more subchannel, still without any picks.
ExpectReresolutionRequest();
picker = ExpectState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("no reachable endpoints; last error: "
"UNAVAILABLE: connection attempt failed"));
connecting_subchannel = nullptr;
for (size_t i = 0; i < subchannels.size(); ++i) {
if (subchannels[i]->ConnectionRequested()) {
ASSERT_EQ(connecting_subchannel, nullptr) << "index " << i;
connecting_subchannel = subchannels[i];
}
}
ASSERT_NE(connecting_subchannel, nullptr);
ASSERT_FALSE(failed_subchannels.contains(connecting_subchannel));
// This subchannel will also fail to connect.
connecting_subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("no reachable endpoints; last error: "
"UNAVAILABLE: connection attempt failed"));
for (size_t i = 0; i < subchannels.size(); ++i) {
EXPECT_FALSE(subchannels[i]->ConnectionRequested());
}
connecting_subchannel->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("connection attempt failed"));
failed_subchannels.insert(connecting_subchannel);
// The policy will once again report TF. It will also trigger a connection
// attempt on the last subchannel, again without any picks.
ExpectReresolutionRequest();
picker = ExpectState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("no reachable endpoints; last error: "
"UNAVAILABLE: connection attempt failed"));
connecting_subchannel = nullptr;
for (size_t i = 0; i < subchannels.size(); ++i) {
if (subchannels[i]->ConnectionRequested()) {
ASSERT_EQ(connecting_subchannel, nullptr) << "index " << i;
connecting_subchannel = subchannels[i];
}
}
ASSERT_NE(connecting_subchannel, nullptr);
ASSERT_FALSE(failed_subchannels.contains(connecting_subchannel));
// This subchannel will also fail to connect.
connecting_subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("no reachable endpoints; last error: "
"UNAVAILABLE: connection attempt failed"));
for (size_t i = 0; i < subchannels.size(); ++i) {
EXPECT_FALSE(subchannels[i]->ConnectionRequested());
}
connecting_subchannel->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("connection attempt failed"));
failed_subchannels.insert(connecting_subchannel);
ExpectReresolutionRequest();
picker = ExpectState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("no reachable endpoints; last error: "
"UNAVAILABLE: connection attempt failed"));
// Now one of the subchannels goes IDLE. The pick_first child will
// trigger a new connection attempt, which will succeed this time.
subchannels[2]->SetConnectivityState(GRPC_CHANNEL_IDLE);
EXPECT_TRUE(subchannels[2]->ConnectionRequested());
subchannels[2]->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannels[2]->SetConnectivityState(GRPC_CHANNEL_READY);
// Now the policy will report READY.
picker = ExpectState(GRPC_CHANNEL_READY);
auto address = ExpectPickComplete(picker.get(), {address0_attribute});
EXPECT_EQ(address, kAddresses[2]);
}

TEST_F(RingHashTest, EndpointHashKeys) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
Expand Down
Loading

0 comments on commit 6c4c773

Please sign in to comment.