Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit debfc85

Browse files
committedJan 18, 2025··
Fix missing metrics and verify metrics are not lost
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent 3a2afb3 commit debfc85

File tree

6 files changed

+253
-16
lines changed

6 files changed

+253
-16
lines changed
 

‎server/config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ type ServerConfig struct {
211211

212212
// ServerFeatureGate is a server level feature gate
213213
ServerFeatureGate featuregate.FeatureGate
214+
215+
// Metrics types of metrics - should be either 'basic' or 'extensive'
216+
Metrics string
214217
}
215218

216219
// VerifyBootstrap sanity-checks the initial config for bootstrap case

‎server/embed/etcd.go

+1-13
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"sync"
3232
"time"
3333

34-
"github.com/prometheus/client_golang/prometheus"
3534
"github.com/soheilhy/cmux"
3635
"go.uber.org/zap"
3736
"google.golang.org/grpc"
@@ -227,6 +226,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
227226
V2Deprecation: cfg.V2DeprecationEffective(),
228227
ExperimentalLocalAddress: cfg.InferLocalAddr(),
229228
ServerFeatureGate: cfg.ServerFeatureGate,
229+
Metrics: cfg.Metrics,
230230
}
231231

232232
if srvcfg.ExperimentalEnableDistributedTracing {
@@ -844,18 +844,6 @@ func (e *Etcd) createMetricsListener(murl url.URL) (net.Listener, error) {
844844
}
845845

846846
func (e *Etcd) serveMetrics() (err error) {
847-
if e.cfg.Metrics == "extensive" {
848-
var opts prometheus.HistogramOpts
849-
serverHandledHistogram := prometheus.NewHistogramVec(
850-
opts,
851-
[]string{"grpc_type", "grpc_service", "grpc_method"},
852-
)
853-
err := prometheus.Register(serverHandledHistogram)
854-
if err != nil {
855-
e.GetLogger().Error("setting up prometheus metrics failed.", zap.Error(err))
856-
}
857-
}
858-
859847
if len(e.cfg.ListenMetricsUrls) > 0 {
860848
metricsMux := http.NewServeMux()
861849
etcdhttp.HandleMetrics(metricsMux)

‎server/etcdserver/api/v3rpc/grpc.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ package v3rpc
1616

1717
import (
1818
"crypto/tls"
19-
"math"
20-
2119
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
2220
"github.com/prometheus/client_golang/prometheus"
2321
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
2422
"google.golang.org/grpc"
2523
"google.golang.org/grpc/health"
2624
healthpb "google.golang.org/grpc/health/grpc_health_v1"
25+
"math"
26+
"strings"
2727

2828
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2929
"go.etcd.io/etcd/client/v3/credentials"
@@ -34,6 +34,14 @@ const (
3434
maxSendBytes = math.MaxInt32
3535
)
3636

37+
func splitMethodName(fullMethodName string) (string, string) {
38+
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
39+
if i := strings.Index(fullMethodName, "/"); i >= 0 {
40+
return fullMethodName[:i], fullMethodName[i+1:]
41+
}
42+
return "unknown", "unknown"
43+
}
44+
3745
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
3846
var opts []grpc.ServerOption
3947
opts = append(opts, grpc.CustomCodec(&codec{}))
@@ -58,6 +66,13 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
5866
serverMetrics.StreamServerInterceptor(),
5967
}
6068

69+
// If extensive metrics are enabled, register a histogram to track the reponse latency of gRPC requests
70+
if s.Cfg.Metrics == "extensive" {
71+
unaryInterceptor, streamInterceptor := constructExtensiveMetricsInterceptors()
72+
chainUnaryInterceptors = append(chainUnaryInterceptors, unaryInterceptor)
73+
chainStreamInterceptors = append(chainStreamInterceptors, streamInterceptor)
74+
}
75+
6176
if s.Cfg.ExperimentalEnableDistributedTracing {
6277
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
6378
chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))

‎server/etcdserver/api/v3rpc/metrics.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414

1515
package v3rpc
1616

17-
import "github.com/prometheus/client_golang/prometheus"
17+
import (
18+
"context"
19+
"time"
20+
21+
"github.com/prometheus/client_golang/prometheus"
22+
"google.golang.org/grpc"
23+
)
1824

1925
var (
2026
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
@@ -56,3 +62,38 @@ func init() {
5662
prometheus.MustRegister(streamFailures)
5763
prometheus.MustRegister(clientRequests)
5864
}
65+
66+
// constructExtensiveMetricsInterceptors constructs unary and stream interceptors to record histogram metrics for gRPC requests
67+
func constructExtensiveMetricsInterceptors() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
68+
// Define a new histogram metric using default buckets
69+
serverHandledHistogram := prometheus.NewHistogramVec(
70+
prometheus.HistogramOpts{
71+
Name: "grpc_server_handling_seconds",
72+
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
73+
Buckets: prometheus.DefBuckets,
74+
},
75+
[]string{"grpc_type", "grpc_service", "grpc_method"},
76+
)
77+
prometheus.Register(serverHandledHistogram)
78+
79+
// method to record histogram metrics for both unary and stream requests
80+
recordHistogramMetrics := func(serverHandledHistogram *prometheus.HistogramVec, grpcType, fullMethodName string, startTime time.Time) {
81+
grpcService, grpcMethod := splitMethodName(fullMethodName)
82+
serverHandledHistogram.WithLabelValues(grpcType, grpcService, grpcMethod).Observe(time.Since(startTime).Seconds())
83+
}
84+
85+
// Add a new interceptor to spit out histogram metrics for unary requests
86+
unaryInterceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
87+
startTime := time.Now()
88+
resp, err = handler(ctx, req)
89+
recordHistogramMetrics(serverHandledHistogram, "unary", info.FullMethod, startTime)
90+
return resp, err
91+
}
92+
streamInterceptor := func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
93+
startTime := time.Now()
94+
err := handler(srv, ss)
95+
recordHistogramMetrics(serverHandledHistogram, "stream", info.FullMethod, startTime)
96+
return err
97+
}
98+
return unaryInterceptor, streamInterceptor
99+
}

‎tests/framework/integration/cluster.go

+4
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ type ClusterConfig struct {
175175
ExperimentalMaxLearners int
176176
DisableStrictReconfigCheck bool
177177
CorruptCheckTime time.Duration
178+
Metrics string
178179
}
179180

180181
type Cluster struct {
@@ -292,6 +293,7 @@ func (c *Cluster) MustNewMember(t testutil.TB) *Member {
292293
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
293294
DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck,
294295
CorruptCheckTime: c.Cfg.CorruptCheckTime,
296+
Metrics: c.Cfg.Metrics,
295297
})
296298
m.DiscoveryURL = c.Cfg.DiscoveryURL
297299
return m
@@ -617,6 +619,7 @@ type MemberConfig struct {
617619
ExperimentalMaxLearners int
618620
DisableStrictReconfigCheck bool
619621
CorruptCheckTime time.Duration
622+
Metrics string
620623
}
621624

622625
// MustNewMember return an inited member with the given name. If peerTLS is
@@ -731,6 +734,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
731734
if mcfg.ExperimentalMaxLearners != 0 {
732735
m.ExperimentalMaxLearners = mcfg.ExperimentalMaxLearners
733736
}
737+
m.Metrics = mcfg.Metrics
734738
m.V2Deprecation = config.V2_DEPR_DEFAULT
735739
m.GRPCServerRecorder = &grpctesting.GRPCRecorder{}
736740

‎tests/integration/clientv3/metrics_test.go

+186
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,189 @@ func getHTTPBodyAsLines(t *testing.T, url string) []string {
181181
resp.Body.Close()
182182
return lines
183183
}
184+
185+
func TestAllMetricsGenerated(t *testing.T) {
186+
integration2.BeforeTest(t)
187+
188+
var (
189+
addr = "localhost:27989"
190+
ln net.Listener
191+
)
192+
193+
srv := &http.Server{Handler: promhttp.Handler()}
194+
srv.SetKeepAlivesEnabled(false)
195+
196+
ln, err := transport.NewUnixListener(addr)
197+
if err != nil {
198+
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
199+
}
200+
201+
donec := make(chan struct{})
202+
defer func() {
203+
ln.Close()
204+
<-donec
205+
}()
206+
207+
// listen for all Prometheus metrics
208+
go func() {
209+
defer close(donec)
210+
211+
serr := srv.Serve(ln)
212+
if serr != nil && !transport.IsClosedConnError(serr) {
213+
t.Errorf("Err serving http requests: %v", serr)
214+
}
215+
}()
216+
217+
url := "unix://" + addr + "/metrics"
218+
219+
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, Metrics: "extensive"})
220+
defer clus.Terminate(t)
221+
222+
clientMetrics := grpcprom.NewClientMetrics()
223+
prometheus.Register(clientMetrics)
224+
225+
cfg := clientv3.Config{
226+
Endpoints: []string{clus.Members[0].GRPCURL},
227+
DialOptions: []grpc.DialOption{
228+
grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
229+
grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()),
230+
},
231+
}
232+
cli, cerr := integration2.NewClient(t, cfg)
233+
if cerr != nil {
234+
t.Fatal(cerr)
235+
}
236+
defer cli.Close()
237+
238+
// Perform some operations to generate metrics
239+
wc := cli.Watch(context.Background(), "foo")
240+
_, err = cli.Put(context.Background(), "foo", "bar")
241+
if err != nil {
242+
t.Errorf("Error putting value in key store")
243+
}
244+
245+
// consume watch response
246+
select {
247+
case <-wc:
248+
case <-time.After(10 * time.Second):
249+
t.Error("Timeout occurred for getting watch response")
250+
}
251+
252+
// Define the expected list of metrics
253+
expectedMetrics := []string{
254+
"etcd_cluster_version",
255+
"etcd_disk_backend_commit_duration_seconds_bucket",
256+
"etcd_disk_backend_commit_duration_seconds_count",
257+
"etcd_disk_backend_commit_duration_seconds_sum",
258+
"etcd_disk_backend_defrag_duration_seconds_bucket",
259+
"etcd_disk_backend_defrag_duration_seconds_count",
260+
"etcd_disk_backend_defrag_duration_seconds_sum",
261+
"etcd_disk_backend_snapshot_duration_seconds_bucket",
262+
"etcd_disk_backend_snapshot_duration_seconds_count",
263+
"etcd_disk_backend_snapshot_duration_seconds_sum",
264+
"etcd_disk_defrag_inflight",
265+
"etcd_disk_wal_fsync_duration_seconds_bucket",
266+
"etcd_disk_wal_fsync_duration_seconds_count",
267+
"etcd_disk_wal_fsync_duration_seconds_sum",
268+
"etcd_disk_wal_write_bytes_total",
269+
"etcd_disk_wal_write_duration_seconds_bucket",
270+
"etcd_disk_wal_write_duration_seconds_count",
271+
"etcd_disk_wal_write_duration_seconds_sum",
272+
"etcd_mvcc_db_open_read_transactions",
273+
"etcd_mvcc_db_total_size_in_bytes",
274+
"etcd_mvcc_db_total_size_in_use_in_bytes",
275+
"etcd_mvcc_delete_total",
276+
"etcd_mvcc_hash_duration_seconds_bucket",
277+
"etcd_mvcc_hash_duration_seconds_count",
278+
"etcd_mvcc_hash_duration_seconds_sum",
279+
"etcd_mvcc_hash_rev_duration_seconds_bucket",
280+
"etcd_mvcc_hash_rev_duration_seconds_count",
281+
"etcd_mvcc_hash_rev_duration_seconds_sum",
282+
"etcd_mvcc_put_total",
283+
"etcd_mvcc_range_total",
284+
"etcd_mvcc_txn_total",
285+
"etcd_network_client_grpc_received_bytes_total",
286+
"etcd_network_client_grpc_sent_bytes_total",
287+
"etcd_network_known_peers",
288+
"etcd_server_apply_duration_seconds_bucket",
289+
"etcd_server_apply_duration_seconds_count",
290+
"etcd_server_apply_duration_seconds_sum",
291+
"etcd_server_client_requests_total",
292+
"etcd_server_go_version",
293+
"etcd_server_has_leader",
294+
"etcd_server_health_failures",
295+
"etcd_server_health_success",
296+
"etcd_server_heartbeat_send_failures_total",
297+
"etcd_server_id",
298+
"etcd_server_is_leader",
299+
"etcd_server_is_learner",
300+
"etcd_server_leader_changes_seen_total",
301+
"etcd_server_learner_promote_successes",
302+
"etcd_server_proposals_applied_total",
303+
"etcd_server_proposals_committed_total",
304+
"etcd_server_proposals_failed_total",
305+
"etcd_server_proposals_pending",
306+
"etcd_server_quota_backend_bytes",
307+
"etcd_server_read_indexes_failed_total",
308+
"etcd_server_slow_apply_total",
309+
"etcd_server_slow_read_indexes_total",
310+
"etcd_server_snapshot_apply_in_progress_total",
311+
"etcd_server_version",
312+
"etcd_snap_db_fsync_duration_seconds_bucket",
313+
"etcd_snap_db_fsync_duration_seconds_count",
314+
"etcd_snap_db_fsync_duration_seconds_sum",
315+
"etcd_snap_db_save_total_duration_seconds_bucket",
316+
"etcd_snap_db_save_total_duration_seconds_count",
317+
"etcd_snap_db_save_total_duration_seconds_sum",
318+
"etcd_snap_fsync_duration_seconds_bucket",
319+
"etcd_snap_fsync_duration_seconds_count",
320+
"etcd_snap_fsync_duration_seconds_sum",
321+
"grpc_client_handled_total",
322+
"grpc_client_msg_received_total",
323+
"grpc_client_msg_sent_total",
324+
"grpc_client_started_total",
325+
"grpc_server_handled_total",
326+
"grpc_server_handling_seconds_bucket",
327+
"grpc_server_handling_seconds_count",
328+
"grpc_server_handling_seconds_sum",
329+
"grpc_server_msg_received_total",
330+
"grpc_server_msg_sent_total",
331+
"grpc_server_started_total",
332+
}
333+
334+
// Get the list of generated metrics
335+
generatedMetrics := getMetricsList(t, url)
336+
for _, metric := range expectedMetrics {
337+
if !contains(generatedMetrics, metric) {
338+
t.Errorf("Expected metric %s not found in generated metrics", metric)
339+
}
340+
}
341+
}
342+
343+
func getMetricsList(t *testing.T, url string) []string {
344+
lines := getHTTPBodyAsLines(t, url)
345+
metrics := make(map[string]struct{})
346+
for _, line := range lines {
347+
if strings.Contains(line, "{") {
348+
metric := line[:strings.Index(line, "{")]
349+
metrics[metric] = struct{}{}
350+
} else {
351+
metric := line[:strings.Index(line, " ")]
352+
metrics[metric] = struct{}{}
353+
}
354+
}
355+
var metricList []string
356+
for metric := range metrics {
357+
metricList = append(metricList, metric)
358+
}
359+
return metricList
360+
}
361+
362+
func contains(slice []string, item string) bool {
363+
for _, s := range slice {
364+
if s == item {
365+
return true
366+
}
367+
}
368+
return false
369+
}

0 commit comments

Comments
 (0)
Please sign in to comment.