Skip to content

Commit

Permalink
[TokenFetcherCredentials] fix backoff behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Oct 25, 2024
1 parent 2213447 commit 8de2fec
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ void TokenFetcherCredentials::Token::AddTokenToClientInitialMetadata(
//

TokenFetcherCredentials::FetchState::BackoffTimer::BackoffTimer(
RefCountedPtr<FetchState> fetch_state)
: fetch_state_(std::move(fetch_state)) {
RefCountedPtr<FetchState> fetch_state, absl::Status status)
: fetch_state_(std::move(fetch_state)), status_(status) {
const Duration delay = fetch_state_->backoff_.NextAttemptDelay();
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
Expand Down Expand Up @@ -100,24 +100,13 @@ void TokenFetcherCredentials::FetchState::BackoffTimer::OnTimer() {
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
<< ": backoff timer fired";
if (fetch_state_->queued_calls_.empty()) {
// If there are no pending calls when the timer fires, then orphan
// the FetchState object. Note that this drops the backoff state,
// but that's probably okay, because if we didn't have any pending
// calls during the backoff period, we probably won't see any
// immediately now either.
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
<< ": no pending calls, clearing state";
fetch_state_->creds_->fetch_state_.reset();
} else {
// If there are pending calls, then start a new fetch attempt.
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
<< ": starting new fetch attempt";
fetch_state_->StartFetchAttempt();
auto* self_ptr =
absl::get_if<OrphanablePtr<BackoffTimer>>(&fetch_state_->state_);
// This condition should always be true, but check to be defensive.
if (self_ptr != nullptr && self_ptr->get() == this) {
// Reset pointer in fetch_state_, so that subsequent RPCs know that
// we're no longer in backoff and they can trigger a new fetch.
self_ptr->reset();
}
}

Expand Down Expand Up @@ -145,6 +134,14 @@ void TokenFetcherCredentials::FetchState::Orphan() {
Unref();
}

absl::Status TokenFetcherCredentials::FetchState::status() const {
auto* backoff_ptr = absl::get_if<OrphanablePtr<BackoffTimer>>(&state_);
if (backoff_ptr == nullptr || *backoff_ptr == nullptr) {
return absl::OkStatus();
}
return (*backoff_ptr)->status();
}

void TokenFetcherCredentials::FetchState::StartFetchAttempt() {
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << creds_.get()
Expand Down Expand Up @@ -182,7 +179,8 @@ void TokenFetcherCredentials::FetchState::TokenFetchComplete(
<< "]: fetch_state=" << this
<< ": token fetch failed: " << token.status();
// If failed, start backoff timer.
state_ = OrphanablePtr<BackoffTimer>(new BackoffTimer(Ref()));
state_ =
OrphanablePtr<BackoffTimer>(new BackoffTimer(Ref(), token.status()));
}
ResumeQueuedCalls(std::move(token));
}
Expand All @@ -204,14 +202,18 @@ void TokenFetcherCredentials::FetchState::ResumeQueuedCalls(
RefCountedPtr<TokenFetcherCredentials::QueuedCall>
TokenFetcherCredentials::FetchState::QueueCall(
ClientMetadataHandle initial_metadata) {
// Add call to pending list.
auto queued_call = MakeRefCounted<QueuedCall>();
queued_call->waker = GetContext<Activity>()->MakeNonOwningWaker();
queued_call->pollent = GetContext<grpc_polling_entity>();
grpc_polling_entity_add_to_pollset_set(
queued_call->pollent, grpc_polling_entity_pollset_set(&creds_->pollent_));
queued_call->md = std::move(initial_metadata);
queued_calls_.insert(queued_call);
// If backoff has expired since the last attempt, trigger a new one.
auto* backoff_ptr = absl::get_if<OrphanablePtr<BackoffTimer>>(&state_);
if (backoff_ptr != nullptr && backoff_ptr->get() == nullptr) {
StartFetchAttempt();
}
return queued_call;
}

Expand Down Expand Up @@ -267,6 +269,11 @@ TokenFetcherCredentials::GetRequestMetadata(
token_->AddTokenToClientInitialMetadata(*initial_metadata);
return Immediate(std::move(initial_metadata));
}
// If we're in backoff, fail the call.
if (fetch_state_ != nullptr) {
absl::Status status = fetch_state_->status();
if (!status.ok()) return Immediate(std::move(status));
}
// If we don't have a cached token, this call will need to be queued.
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,30 @@ class TokenFetcherCredentials : public grpc_call_credentials {
// annotations.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;

// Returns non-OK when we're in backoff.
absl::Status status() const;

RefCountedPtr<QueuedCall> QueueCall(ClientMetadataHandle initial_metadata)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&TokenFetcherCredentials::mu_);

private:
class BackoffTimer : public InternallyRefCounted<BackoffTimer> {
public:
explicit BackoffTimer(RefCountedPtr<FetchState> fetch_state)
BackoffTimer(RefCountedPtr<FetchState> fetch_state, absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&TokenFetcherCredentials::mu_);

// Disabling thread safety annotations, since Orphan() is called
// by OrpahanablePtr<>, which does not have the right lock
// annotations.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;

absl::Status status() const { return status_; }

private:
void OnTimer();

RefCountedPtr<FetchState> fetch_state_;
const absl::Status status_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(&TokenFetcherCredentials::mu_);
};
Expand Down
102 changes: 48 additions & 54 deletions test/core/security/credentials_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2569,31 +2569,35 @@ TEST_F(TokenFetcherCredentialsTest, FetchFails) {
run_after_duration = duration;
});
ExecCtx exec_ctx;
creds_->AddResult(kExpectedError);
// First request will trigger a fetch, which will fail.
LOG(INFO) << "Sending first RPC.";
creds_->AddResult(kExpectedError);
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
EXPECT_EQ(creds_->num_fetches(), 1);
while (!run_after_duration.has_value()) event_engine_->Tick();
// Make sure backoff was set for the right period.
// This is 1 second (initial backoff) minus 1ms for the tick needed above.
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
run_after_duration.reset();
// Start a new call now, which will be queued and then eventually
// resumed when the next fetch happens.
// Start a new call now, which will fail because we're in backoff.
LOG(INFO) << "Sending second RPC.";
state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
kExpectedError, "authorization: foo", /*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
// Tick until the next fetch starts.
creds_->AddResult(MakeToken("foo"));
EXPECT_EQ(creds_->num_fetches(), 1);
// Tick until backoff expires.
LOG(INFO) << "Waiting for backoff.";
event_engine_->TickUntilIdle();
EXPECT_EQ(creds_->num_fetches(), 2);
// A call started now should use the new cached data.
EXPECT_EQ(creds_->num_fetches(), 1);
// Starting another call should trigger a new fetch, which will
// succeed this time.
LOG(INFO) << "Sending third RPC.";
creds_->AddResult(MakeToken("foo"));
state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/false);
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
EXPECT_EQ(creds_->num_fetches(), 2);
Expand All @@ -2607,8 +2611,9 @@ TEST_F(TokenFetcherCredentialsTest, Backoff) {
run_after_duration = duration;
});
ExecCtx exec_ctx;
creds_->AddResult(kExpectedError);
// First request will trigger a fetch, which will fail.
LOG(INFO) << "Sending first RPC.";
creds_->AddResult(kExpectedError);
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
Expand All @@ -2618,64 +2623,53 @@ TEST_F(TokenFetcherCredentialsTest, Backoff) {
// Make sure backoff was set for the right period.
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
run_after_duration.reset();
// Start a new call now, which will be queued and then eventually
// resumed when the next fetch happens.
// Start a new call now, which will fail because we're in backoff.
LOG(INFO) << "Sending second RPC.";
state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
/*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
// Tick until the next fetch fails and the backoff timer starts again.
EXPECT_EQ(creds_->num_fetches(), 1);
// Tick until backoff expires.
LOG(INFO) << "Waiting for backoff.";
event_engine_->TickUntilIdle();
EXPECT_EQ(creds_->num_fetches(), 1);
// Starting another call should trigger a new fetch, which will again fail.
LOG(INFO) << "Sending third RPC.";
creds_->AddResult(kExpectedError);
while (!run_after_duration.has_value()) event_engine_->Tick();
EXPECT_EQ(creds_->num_fetches(), 2);
// The backoff time should be longer now. We account for jitter here.
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(1600))
<< "actual: " << run_after_duration->count();
run_after_duration.reset();
// Start another new call to trigger another new fetch once the
// backoff expires.
state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
// Tick until the next fetch starts.
creds_->AddResult(kExpectedError);
EXPECT_EQ(creds_->num_fetches(), 2);
while (!run_after_duration.has_value()) event_engine_->Tick();
EXPECT_EQ(creds_->num_fetches(), 3);
// Check backoff time again.
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(2560))
// The backoff time should be longer now.
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(1600))
<< "actual: " << run_after_duration->count();
}

TEST_F(TokenFetcherCredentialsTest, FetchNotStartedAfterBackoffWithoutRpc) {
const absl::Status kExpectedError = absl::UnavailableError("bummer, dude");
absl::optional<FuzzingEventEngine::Duration> run_after_duration;
event_engine_->SetRunAfterDurationCallback(
[&](FuzzingEventEngine::Duration duration) {
run_after_duration = duration;
});
ExecCtx exec_ctx;
creds_->AddResult(kExpectedError);
// First request will trigger a fetch, which will fail.
auto state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
run_after_duration.reset();
// Start a new call now, which will fail because we're in backoff.
LOG(INFO) << "Sending fourth RPC.";
state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
EXPECT_EQ(creds_->num_fetches(), 1);
while (!run_after_duration.has_value()) event_engine_->Tick();
// Make sure backoff was set for the right period.
EXPECT_EQ(run_after_duration, std::chrono::seconds(1));
run_after_duration.reset();
// Tick until the backoff expires. No new fetch should be started.
EXPECT_EQ(creds_->num_fetches(), 2);
// Tick until backoff expires.
LOG(INFO) << "Waiting for backoff.";
event_engine_->TickUntilIdle();
EXPECT_EQ(creds_->num_fetches(), 1);
// Now start a new request, which will trigger a new fetch.
creds_->AddResult(MakeToken("foo"));
state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
EXPECT_EQ(creds_->num_fetches(), 2);
// Starting another call should trigger a new fetch, which will again fail.
LOG(INFO) << "Sending fifth RPC.";
creds_->AddResult(kExpectedError);
state = RequestMetadataState::NewInstance(kExpectedError, "",
/*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
kTestPath);
EXPECT_EQ(creds_->num_fetches(), 2);
EXPECT_EQ(creds_->num_fetches(), 3);
while (!run_after_duration.has_value()) event_engine_->Tick();
// The backoff time should be longer now.
EXPECT_EQ(run_after_duration, std::chrono::milliseconds(2560))
<< "actual: " << run_after_duration->count();
}

TEST_F(TokenFetcherCredentialsTest, ShutdownWhileBackoffTimerPending) {
Expand Down

0 comments on commit 8de2fec

Please sign in to comment.