diff --git a/src/core/load_balancing/pick_first/pick_first.cc b/src/core/load_balancing/pick_first/pick_first.cc index 016e198000def..6fab6bd942651 100644 --- a/src/core/load_balancing/pick_first/pick_first.cc +++ b/src/core/load_balancing/pick_first/pick_first.cc @@ -423,7 +423,7 @@ PickFirst::PickFirst(Args args) PickFirst::~PickFirst() { GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this; - CHECK(subchannel_list_ == nullptr); + CHECK_EQ(subchannel_list_.get(), nullptr); } void PickFirst::ShutdownLocked() { @@ -744,6 +744,8 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState:: // If we're still part of a subchannel list trying to connect, check // if we're connected. if (subchannel_data_ != nullptr) { + CHECK_EQ(pick_first_->subchannel_list_.get(), + subchannel_data_->subchannel_list_); // If the subchannel is READY, use it. // Otherwise, tell the subchannel list to keep trying. if (new_state == GRPC_CHANNEL_READY) { @@ -754,7 +756,7 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState:: return; } // We aren't trying to connect, so we must be the selected subchannel. - CHECK(pick_first_->selected_.get() == this); + CHECK_EQ(pick_first_->selected_.get(), this); GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << pick_first_.get() << " selected subchannel connectivity changed to " @@ -803,15 +805,14 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( << ", p->subchannel_list_=" << p->subchannel_list_.get() << ", p->subchannel_list_->shutting_down_=" << p->subchannel_list_->shutting_down_; - if (subchannel_list_->shutting_down_) return; // The notification must be for a subchannel in the current list. - CHECK(subchannel_list_ == p->subchannel_list_.get()); + CHECK_EQ(subchannel_list_, p->subchannel_list_.get()); // SHUTDOWN should never happen. - CHECK(new_state != GRPC_CHANNEL_SHUTDOWN); + CHECK_NE(new_state, GRPC_CHANNEL_SHUTDOWN); // READY should be caught by SubchannelState, in which case it will // not call us in the first place. - CHECK(new_state != GRPC_CHANNEL_READY); + CHECK_NE(new_state, GRPC_CHANNEL_READY); // Update state. absl::optional old_state = connectivity_state_; connectivity_state_ = new_state; @@ -935,7 +936,7 @@ void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() { if (connectivity_state_ == GRPC_CHANNEL_IDLE) { subchannel_state_->RequestConnection(); } else { - CHECK(connectivity_state_ == GRPC_CHANNEL_CONNECTING); + CHECK_EQ(connectivity_state_.value(), GRPC_CHANNEL_CONNECTING); } // If this is not the last subchannel in the list, start the timer. if (index_ != subchannel_list_->size() - 1) { diff --git a/test/core/load_balancing/lb_policy_test_lib.h b/test/core/load_balancing/lb_policy_test_lib.h index 2280ae6833ff9..626bb7bde9ce4 100644 --- a/test/core/load_balancing/lb_policy_test_lib.h +++ b/test/core/load_balancing/lb_policy_test_lib.h @@ -60,6 +60,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/security/credentials/credentials.h" @@ -92,6 +93,9 @@ namespace testing { class LoadBalancingPolicyTest : public ::testing::Test { protected: + using FuzzingEventEngine = + grpc_event_engine::experimental::FuzzingEventEngine; + using CallAttributes = std::vector; @@ -573,7 +577,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { MutexLock lock(&mu_); StateUpdate update{ state, status, - MakeRefCounted(test_, std::move(picker))}; + IsWorkSerializerDispatchEnabled() + ? std::move(picker) + : MakeRefCounted(test_, std::move(picker))}; LOG(INFO) << "enqueuing state update from LB policy: " << update.ToString(); queue_.push_back(std::move(update)); @@ -698,10 +704,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Order is important here: Fuzzing EE needs to be created before // grpc_init(), and the POSIX EE (which is used by the WorkSerializer) // needs to be created after grpc_init(). - fuzzing_ee_ = - std::make_shared( - grpc_event_engine::experimental::FuzzingEventEngine::Options(), - fuzzing_event_engine::Actions()); + fuzzing_ee_ = MakeFuzzingEventEngine(); grpc_init(); event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); work_serializer_ = std::make_shared(event_engine_); @@ -723,14 +726,16 @@ class LoadBalancingPolicyTest : public ::testing::Test { WaitForWorkSerializerToFlush(); work_serializer_.reset(); exec_ctx.Flush(); - // Note: Can't safely trigger this from inside the FakeHelper dtor, - // because if there is a picker in the queue that is holding a ref - // to the LB policy, that will prevent the LB policy from being - // destroyed, and therefore the FakeHelper will not be destroyed. - // (This will cause an ASAN failure, but it will not display the - // queued events, so the failure will be harder to diagnose.) - helper_->ExpectQueueEmpty(); - lb_policy_.reset(); + if (lb_policy_ != nullptr) { + // Note: Can't safely trigger this from inside the FakeHelper dtor, + // because if there is a picker in the queue that is holding a ref + // to the LB policy, that will prevent the LB policy from being + // destroyed, and therefore the FakeHelper will not be destroyed. + // (This will cause an ASAN failure, but it will not display the + // queued events, so the failure will be harder to diagnose.) + helper_->ExpectQueueEmpty(); + lb_policy_.reset(); + } fuzzing_ee_->TickUntilIdle(); grpc_event_engine::experimental::WaitForSingleOwner( std::move(event_engine_)); @@ -739,6 +744,12 @@ class LoadBalancingPolicyTest : public ::testing::Test { fuzzing_ee_.reset(); } + virtual std::shared_ptr MakeFuzzingEventEngine() { + return std::make_shared( + grpc_event_engine::experimental::FuzzingEventEngine::Options(), + fuzzing_event_engine::Actions()); + } + LoadBalancingPolicy* lb_policy() const { CHECK(lb_policy_ != nullptr); return lb_policy_.get(); @@ -1465,8 +1476,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { } } - std::shared_ptr - fuzzing_ee_; + std::shared_ptr fuzzing_ee_; // TODO(ctiller): this is a normal event engine, yet it gets its time measure // from fuzzing_ee_ -- results are likely to be a little funky, but seem to do // well enough for the tests we have today. diff --git a/test/core/load_balancing/pick_first_test.cc b/test/core/load_balancing/pick_first_test.cc index 75bf64c83283e..7ab700b4b47a0 100644 --- a/test/core/load_balancing/pick_first_test.cc +++ b/test/core/load_balancing/pick_first_test.cc @@ -75,7 +75,7 @@ class PickFirstTest : public LoadBalancingPolicyTest { } // Gets order the addresses are being picked. Return type is void so - // assertions can be used + // assertions can be used. void GetOrderAddressesArePicked( absl::Span addresses, std::vector* out_address_order) { @@ -1172,6 +1172,72 @@ TEST_F(PickFirstTest, AddressUpdateRetainsSelectedAddress) { EXPECT_FALSE(subchannel2->ConnectionRequested()); } +// DO NOT USE! +// +// A test class that overrides the FuzzingEventEngine to make timer +// cancellation always fail. This is used to simulate cases where, at +// the moment that the timer is cancelled, the timer has already fired +// but the timer callback has not yet run in the WorkSerializer. +// +// TODO(roth): This is a really ugly hack. As part of changing these +// tests to use the FuzzingEventEngine exclusively, we should instead +// find a way to tick the FuzzingEventEngine to the right point so that +// we don't need this ugliness. +class PickFirstNoCancelTimerTest : public PickFirstTest { + protected: + class FuzzingEventEngineWithoutTimerCancellation : public FuzzingEventEngine { + public: + using FuzzingEventEngine::FuzzingEventEngine; + + bool Cancel(TaskHandle) override { return false; } + }; + + std::shared_ptr MakeFuzzingEventEngine() override { + return std::make_shared( + grpc_event_engine::experimental::FuzzingEventEngine::Options(), + fuzzing_event_engine::Actions()); + } +}; + +// This exercizes a bug seen in the wild that caused a crash. For +// details, see https://github.com/grpc/grpc/pull/38144. +TEST_F(PickFirstNoCancelTimerTest, SubchannelNotificationAfterShutdown) { + // Send an update containing one address. + constexpr std::array kAddresses = { + "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; + absl::Status status = ApplyUpdate( + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + EXPECT_TRUE(status.ok()) << status; + // LB policy should have created a subchannel for each address. + auto* subchannel = FindSubchannel(kAddresses[0]); + ASSERT_NE(subchannel, nullptr); + auto* subchannel2 = FindSubchannel(kAddresses[1]); + ASSERT_NE(subchannel2, nullptr); + // When the LB policy receives the first subchannel's initial connectivity + // state notification (IDLE), it will request a connection. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // This causes the subchannel to start to connect, so it reports CONNECTING. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // Now shut down the LB policy. + // This will cancel the Happy Eyeballs timer, but since we're using a + // FuzzingEventEngine that fails timer cancellations, it simulates the + // case where the timer has already fired but the timer callback has + // not yet run inside the WorkSerializer. + lb_policy_.reset(); + // Now the subchannel reports READY. Before the bug fix, this caused + // us to select the subchannel instead of ignoring the notification. + // With the bug fix, this update should never actually be delivered to + // the LB policy, since it will have already shut down the subchannel. + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // Now trigger the Happy Eyeballs timer to fire. + IncrementTimeBy(Duration::Milliseconds(250)); + // Now the subchannel reports IDLE. Before the bug fix, this + // triggered a crash. + subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); +} + TEST_F(PickFirstTest, WithShuffle) { constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",