Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into reorg_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Nov 14, 2024
2 parents ebc9f9a + 35e1bfa commit ad77d1d
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 125 deletions.
3 changes: 3 additions & 0 deletions src/core/lib/promise/party.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) {
}

void Party::AddParticipants(Participant** participants, size_t count) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipants");
uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated;

Expand Down Expand Up @@ -400,6 +401,7 @@ void Party::AddParticipants(Participant** participants, size_t count) {
}

void Party::AddParticipant(Participant* participant) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipant");
uint64_t state = state_.load(std::memory_order_acquire);
uint64_t allocated;
size_t slot;
Expand Down Expand Up @@ -468,6 +470,7 @@ void Party::WakeupAsync(WakeupMask wakeup_mask) {
wakeup_mask_ |= wakeup_mask;
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this, prev_state]() {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::WakeupAsync");
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLockedAndUnref(this, prev_state);
Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/promise/party.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ class Party : public Activity, private Wakeable {

// Wakeable implementation
void Wakeup(WakeupMask wakeup_mask) final {
GRPC_LATENT_SEE_INNER_SCOPE("Party::Wakeup");
if (Activity::current() == this) {
wakeup_mask_ |= wakeup_mask;
Unref();
Expand All @@ -352,6 +353,7 @@ class Party : public Activity, private Wakeable {

GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void WakeupFromState(
uint64_t cur_state, WakeupMask wakeup_mask) {
GRPC_LATENT_SEE_INNER_SCOPE("Party::WakeupFromState");
DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
<< "Wakeup mask must be non-zero: " << wakeup_mask;
while (true) {
Expand Down
3 changes: 3 additions & 0 deletions src/core/lib/surface/client_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ void ClientCall::CancelWithError(grpc_error_handle error) {

template <typename Batch>
void ClientCall::ScheduleCommittedBatch(Batch batch) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::ScheduleCommittedBatch");
auto cur_state = call_state_.load(std::memory_order_acquire);
while (true) {
switch (cur_state) {
Expand Down Expand Up @@ -225,6 +226,7 @@ void ClientCall::ScheduleCommittedBatch(Batch batch) {
}

void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::StartCall");
auto cur_state = call_state_.load(std::memory_order_acquire);
CToMetadata(send_initial_metadata_op.data.send_initial_metadata.metadata,
send_initial_metadata_op.data.send_initial_metadata.count,
Expand Down Expand Up @@ -271,6 +273,7 @@ void ClientCall::StartCall(const grpc_op& send_initial_metadata_op) {

void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) {
GRPC_LATENT_SEE_INNER_SCOPE("ClientCall::CommitBatch");
if (nops == 1 && ops[0].op == GRPC_OP_SEND_INITIAL_METADATA) {
StartCall(ops[0]);
EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/transport/metadata_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bool DebugStringBuilder::IsAllowListed(const absl::string_view key) const {
allow_list.insert(std::string(LbTokenMetadata::key()));
allow_list.insert(std::string(TeMetadata::key()));
allow_list.insert(std::string(UserAgentMetadata::key()));
allow_list.insert(std::string(W3CTraceParentMetadata::key()));
allow_list.insert(std::string(XEnvoyPeerMetadata::key()));
// go/keep-sorted end
// go/keep-sorted start
Expand Down
10 changes: 9 additions & 1 deletion src/core/lib/transport/metadata_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,14 @@ struct LbCostBinMetadata {
MetadataParseErrorFn on_error);
};

// traceparent metadata
struct W3CTraceParentMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = FrequentKeyWithNoValueCompressionCompressor;
static absl::string_view key() { return "traceparent"; }
};

// Annotation added by a transport to note whether a failed request was never
// placed on the wire, or never seen by a server.
struct GrpcStreamNetworkState {
Expand Down Expand Up @@ -1582,7 +1590,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcServerStatsBinMetadata, grpc_core::GrpcTraceBinMetadata,
grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata,
grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata,
grpc_core::XEnvoyPeerMetadata,
grpc_core::XEnvoyPeerMetadata, grpc_core::W3CTraceParentMetadata,
// Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
Expand Down
27 changes: 15 additions & 12 deletions src/core/telemetry/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,42 +120,45 @@ void GlobalStatsPluginRegistry::StatsPluginGroup::AddServerCallTracers(
}
}

NoDestruct<Mutex> GlobalStatsPluginRegistry::mutex_;
NoDestruct<std::vector<std::shared_ptr<StatsPlugin>>>
std::atomic<GlobalStatsPluginRegistry::GlobalStatsPluginNode*>
GlobalStatsPluginRegistry::plugins_;

void GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::shared_ptr<StatsPlugin> plugin) {
MutexLock lock(&*mutex_);
plugins_->push_back(std::move(plugin));
GlobalStatsPluginNode* node = new GlobalStatsPluginNode();
node->plugin = std::move(plugin);
node->next = plugins_.load(std::memory_order_relaxed);
while (!plugins_.compare_exchange_weak(
node->next, node, std::memory_order_acq_rel, std::memory_order_relaxed)) {
}
}

GlobalStatsPluginRegistry::StatsPluginGroup
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
const experimental::StatsPluginChannelScope& scope) {
MutexLock lock(&*mutex_);
StatsPluginGroup group;
for (const auto& plugin : *plugins_) {
for (GlobalStatsPluginNode* node = plugins_.load(std::memory_order_acquire);
node != nullptr; node = node->next) {
bool is_enabled = false;
std::shared_ptr<StatsPlugin::ScopeConfig> config;
std::tie(is_enabled, config) = plugin->IsEnabledForChannel(scope);
std::tie(is_enabled, config) = node->plugin->IsEnabledForChannel(scope);
if (is_enabled) {
group.AddStatsPlugin(plugin, std::move(config));
group.AddStatsPlugin(node->plugin, std::move(config));
}
}
return group;
}

GlobalStatsPluginRegistry::StatsPluginGroup
GlobalStatsPluginRegistry::GetStatsPluginsForServer(const ChannelArgs& args) {
MutexLock lock(&*mutex_);
StatsPluginGroup group;
for (const auto& plugin : *plugins_) {
for (GlobalStatsPluginNode* node = plugins_.load(std::memory_order_acquire);
node != nullptr; node = node->next) {
bool is_enabled = false;
std::shared_ptr<StatsPlugin::ScopeConfig> config;
std::tie(is_enabled, config) = plugin->IsEnabledForServer(args);
std::tie(is_enabled, config) = node->plugin->IsEnabledForServer(args);
if (is_enabled) {
group.AddStatsPlugin(plugin, std::move(config));
group.AddStatsPlugin(node->plugin, std::move(config));
}
}
return group;
Expand Down
10 changes: 7 additions & 3 deletions src/core/telemetry/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ class GlobalStatsPluginRegistry {
return false;
}

size_t size() const { return plugins_state_.size(); }

// Registers a callback to be used to populate callback metrics.
// The callback will update the specified metrics. The callback
// will be invoked no more often than min_interval. Multiple callbacks may
Expand Down Expand Up @@ -508,13 +510,15 @@ class GlobalStatsPluginRegistry {
static StatsPluginGroup GetStatsPluginsForServer(const ChannelArgs& args);

private:
struct GlobalStatsPluginNode {
std::shared_ptr<StatsPlugin> plugin;
GlobalStatsPluginNode* next = nullptr;
};
friend class GlobalStatsPluginRegistryTestPeer;

GlobalStatsPluginRegistry() = default;

static NoDestruct<Mutex> mutex_;
static NoDestruct<std::vector<std::shared_ptr<StatsPlugin>>> plugins_
ABSL_GUARDED_BY(mutex_);
static std::atomic<GlobalStatsPluginNode*> plugins_;
};

// A metric callback that is registered with a stats plugin group.
Expand Down
1 change: 1 addition & 0 deletions src/core/util/latent_see.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "absl/functional/function_ref.h"
#include "absl/log/log.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/util/per_cpu.h"
#include "src/core/util/sync.h"

Expand Down
1 change: 1 addition & 0 deletions src/core/xds/grpc/xds_route_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ std::string XdsRouteConfigResource::Route::RouteAction::ToString() const {
if (max_stream_duration.has_value()) {
contents.push_back(max_stream_duration->ToString());
}
if (auto_host_rewrite) contents.push_back("auto_host_rewrite=true");
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}

Expand Down
2 changes: 0 additions & 2 deletions test/core/end2end/tests/disappearing_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "src/core/util/time.h"
#include "test/core/end2end/end2end_tests.h"

#ifndef GPR_WINDOWS // b/148110727 for more details
namespace grpc_core {

static void OneRequestAndShutdownServer(CoreEnd2endTest& test) {
Expand Down Expand Up @@ -77,4 +76,3 @@ CORE_END2END_TEST(CoreClientChannelTest, DisappearingServer) {
}

} // namespace grpc_core
#endif // GPR_WINDOWS
33 changes: 33 additions & 0 deletions test/core/telemetry/metrics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "src/core/telemetry/metrics.h"

#include <memory>
#include <thread>

#include "absl/log/log.h"
#include "gmock/gmock.h"
Expand Down Expand Up @@ -648,6 +649,38 @@ TEST_F(MetricsTest, FindInstrumentByName) {
::testing::Eq(uint64_counter_handle.index))));
}

TEST_F(MetricsTest, ParallelStatsPluginRegistrationAndLookup) {
std::vector<std::thread> register_threads;
std::vector<std::thread> lookup_threads;
register_threads.reserve(100);
lookup_threads.reserve(100);
// 100 threads that register 100 stats plugins each
for (int i = 0; i < 100; ++i) {
register_threads.emplace_back([] {
for (int j = 0; j < 100; ++j) {
FakeStatsPluginBuilder().BuildAndRegister();
}
});
}
// 100 threads that keep looking up stats plugins till they see 10000 stats
// plugins
for (int i = 0; i < 100; ++i) {
lookup_threads.emplace_back([this] {
while (GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
StatsPluginChannelScope("", "", endpoint_config_))
.size() < 10000) {
};
});
}
for (int i = 0; i < 100; ++i) {
register_threads[i].join();
lookup_threads[i].join();
}
EXPECT_THAT(GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
StatsPluginChannelScope("", "", endpoint_config_)),
::testing::SizeIs(10000));
}

using MetricsDeathTest = MetricsTest;

TEST_F(MetricsDeathTest, RegisterTheSameMetricNameWouldCrash) {
Expand Down
5 changes: 0 additions & 5 deletions test/core/test_util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,6 @@ grpc_cc_test(
],
)

sh_library(
name = "fuzzer_one_entry_runner",
srcs = ["fuzzer_one_entry_runner.sh"],
)

grpc_cc_library(
name = "stack_tracer",
srcs = [
Expand Down
10 changes: 8 additions & 2 deletions test/core/test_util/fake_stats_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,14 @@ class GlobalInstrumentsRegistryTestPeer {
class GlobalStatsPluginRegistryTestPeer {
public:
static void ResetGlobalStatsPluginRegistry() {
MutexLock lock(&*GlobalStatsPluginRegistry::mutex_);
GlobalStatsPluginRegistry::plugins_->clear();
GlobalStatsPluginRegistry::GlobalStatsPluginNode* node =
GlobalStatsPluginRegistry::plugins_.exchange(nullptr,
std::memory_order_acq_rel);
while (node != nullptr) {
GlobalStatsPluginRegistry::GlobalStatsPluginNode* next = node->next;
delete node;
node = next;
}
}
};

Expand Down
18 changes: 0 additions & 18 deletions test/core/test_util/fuzzer_one_entry_runner.sh

This file was deleted.

39 changes: 0 additions & 39 deletions test/core/test_util/one_corpus_entry_fuzzer.cc

This file was deleted.

Loading

0 comments on commit ad77d1d

Please sign in to comment.