diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index ed55e0357c9..d4efc49a7d7 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -53,7 +53,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer } chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ - newLogUnaryInterceptor(s), + newLogUnaryInterceptor(s.Logger(), s.Cfg.WarningUnaryRequestDuration), newUnaryInterceptor(s), serverMetrics.UnaryServerInterceptor(), } diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 697d0b075ed..87abc562754 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -16,6 +16,7 @@ package v3rpc import ( "context" + "go.uber.org/zap/zapcore" "sync" "time" "unicode/utf8" @@ -75,13 +76,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } -func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { +func newLogUnaryInterceptor(lg *zap.Logger, warnLatency time.Duration) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { startTime := time.Now() resp, err := handler(ctx, req) - lg := s.Logger() if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive - defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp) + defer logUnaryRequestStats(ctx, lg, warnLatency, info, startTime, req, resp) } return resp, err } @@ -173,43 +173,57 @@ func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time. respSize = -1 } + rs := requestStats{ + startTime: startTime, + timeSpent: duration, + remote: remote, + responseType: responseType, + reqCount: reqCount, + reqSize: reqSize, + respCount: respCount, + respSize: respSize, + reqContent: reqContent, + } + if enabledDebugLevel { - logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) + logGenericRequestStats(lg, rs) } else if expensiveRequest { - logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) + logExpensiveRequestStats(lg, rs) } } -func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, - reqCount int64, reqSize int, respCount int64, respSize int, reqContent string, -) { - lg.Debug("request stats", - zap.Time("start time", startTime), - zap.Duration("time spent", duration), - zap.String("remote", remote), - zap.String("response type", responseType), - zap.Int64("request count", reqCount), - zap.Int("request size", reqSize), - zap.Int64("response count", respCount), - zap.Int("response size", respSize), - zap.String("request content", reqContent), - ) +type requestStats struct { + startTime time.Time + timeSpent time.Duration + remote string + responseType string + reqCount int64 + reqSize int + respCount int64 + respSize int + reqContent string +} + +func (rs requestStats) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddTime("start time", rs.startTime) + enc.AddDuration("time spent", rs.timeSpent) + enc.AddString("remote", rs.remote) + enc.AddString("response type", rs.responseType) + enc.AddInt64("request count", rs.reqCount) + enc.AddInt("request size", rs.reqSize) + enc.AddInt64("response count", rs.respCount) + enc.AddInt("response size", rs.respSize) + enc.AddString("request content", rs.reqContent) + + return nil +} + +func logGenericRequestStats(lg *zap.Logger, rs requestStats) { + lg.Debug("request stats", zap.Inline(rs)) } -func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, - reqCount int64, reqSize int, respCount int64, respSize int, reqContent string, -) { - lg.Warn("request stats", - zap.Time("start time", startTime), - zap.Duration("time spent", duration), - zap.String("remote", remote), - zap.String("response type", responseType), - zap.Int64("request count", reqCount), - zap.Int("request size", reqSize), - zap.Int64("response count", respCount), - zap.Int("response size", respSize), - zap.String("request content", reqContent), - ) +func logExpensiveRequestStats(lg *zap.Logger, rs requestStats) { + lg.Warn("request stats", zap.Inline(rs)) } func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor { diff --git a/server/etcdserver/api/v3rpc/interceptor_test.go b/server/etcdserver/api/v3rpc/interceptor_test.go new file mode 100644 index 00000000000..ae5c7c1a9da --- /dev/null +++ b/server/etcdserver/api/v3rpc/interceptor_test.go @@ -0,0 +1,177 @@ +package v3rpc + +import ( + "context" + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" + "net" + "testing" + "time" +) + +func buildMockUnaryHandler(t *testing.T, mockResp interface{}, handlerLatency time.Duration) grpc.UnaryHandler { + t.Helper() + return func(ctx context.Context, req interface{}) (interface{}, error) { + // Add latency to mock handler. + time.Sleep(handlerLatency) + return mockResp, nil + } +} + +func TestLogUnaryInterceptor(t *testing.T) { + // Warn on request latency if > 5ms. + handlerWarnLatencyThreshold := time.Millisecond * 5 + + unaryServerInfo := &grpc.UnaryServerInfo{ + FullMethod: "/foo/bar", + } + + address := "10.0.0.1:37928" + addr, err := net.ResolveTCPAddr("tcp", address) + assert.NoError(t, err) + p := &peer.Peer{ + Addr: addr, + } + + testcases := []struct { + name string + req interface{} + resp interface{} + reqLatency time.Duration + debugLogLevel bool // debugLogLevel indicates whether log level is debug. + expectedRequestStats *requestStats + }{ + {"sample transaction with successful compare", + &pb.TxnRequest{ + Compare: []*pb.Compare{{ + Key: []byte("/users/12345/email"), + Result: pb.Compare_EQUAL, + Target: pb.Compare_VALUE, + TargetUnion: &pb.Compare_Value{Value: []byte("old.address@johndoe.com")}, + }}, + Success: []*pb.RequestOp{{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("/users/12345/email"), + Value: []byte("new.address@johndoe.com"), + }, + }, + }}, + }, + &pb.TxnResponse{ + Succeeded: true, + Responses: []*pb.ResponseOp{{Response: &pb.ResponseOp_ResponsePut{}}}, + }, + 0, true, + &requestStats{ + reqCount: 1, + reqSize: 47, + respCount: 0, + respSize: 4, + reqContent: "compare: success:> failure:<>", + }, + }, + {"sample transaction with failed compare", + &pb.TxnRequest{ + Compare: []*pb.Compare{{ + Key: []byte("/users/12345/email"), + Result: pb.Compare_EQUAL, + Target: pb.Compare_VALUE, + TargetUnion: &pb.Compare_Value{Value: []byte("old.address@johndoe.com")}, + }}, + Failure: []*pb.RequestOp{{ + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("/users/12345/email"), + }, + }}, + }, + }, + &pb.TxnResponse{ + Succeeded: false, + Responses: []*pb.ResponseOp{{Response: &pb.ResponseOp_ResponsePut{}}}, + }, + 0, true, + &requestStats{ + reqCount: 1, + reqSize: 22, + respCount: 0, + respSize: 2, + reqContent: "compare: success:<> failure: >", + }, + }, + {"expensive request with debug logs disabled", &pb.RangeRequest{Key: []byte("fooKey")}, &pb.RangeResponse{Count: 2}, + time.Millisecond * 10, false, + &requestStats{ + reqCount: 0, + reqSize: 8, + respCount: 2, + respSize: 2, + reqContent: "key:\"fooKey\" ", + }, + }, + // Unrecognized response types result in -1 values. + {"default request stats -1", "fooRequest", "fooResponse", 0, true, + &requestStats{ + reqCount: -1, + reqSize: -1, + respCount: -1, + respSize: -1, + reqContent: "", + }}, + // Low-latency handler without debug-level logging enabled generates no request stat logs. + {"no debug or warn level logs", "fooRequest", "fooResponse", 0, false, nil}, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + expensiveUnaryHandler := buildMockUnaryHandler(t, tc.resp, tc.reqLatency) + logLevel := zapcore.InfoLevel + if tc.debugLogLevel { + logLevel = zapcore.DebugLevel + } + + observedZapCore, observedLogs := observer.New(logLevel) + observedLogger := zap.New(observedZapCore) + + interceptor := newLogUnaryInterceptor(observedLogger, handlerWarnLatencyThreshold) + + ctx := peer.NewContext(context.TODO(), p) + _, err := interceptor(ctx, tc.req, unaryServerInfo, expensiveUnaryHandler) + assert.NoError(t, err) + + // Filter for request stats log messages. + rsLogs := observedLogs.FilterMessage("request stats") + + // No request stats if log-level is not debug or warn latency threshold is not exceeded. + if !(tc.debugLogLevel || tc.reqLatency > handlerWarnLatencyThreshold) { + assert.Equal(t, 0, rsLogs.Len()) + } else { + assert.Equal(t, 1, rsLogs.Len()) + le := rsLogs.All()[0] + assert.Equal(t, 1, len(le.Context)) + fld := le.Context[0] + rs, ok := fld.Interface.(requestStats) + assert.True(t, ok) + assert.Equal(t, tc.expectedRequestStats.reqCount, rs.reqCount) + assert.Equal(t, tc.expectedRequestStats.reqSize, rs.reqSize) + assert.Equal(t, tc.expectedRequestStats.respCount, rs.respCount) + assert.Equal(t, tc.expectedRequestStats.respSize, rs.respSize) + assert.Equal(t, tc.expectedRequestStats.reqContent, rs.reqContent) + assert.Equal(t, unaryServerInfo.FullMethod, rs.responseType) + // Check peer info read from context. + assert.Equal(t, address, rs.remote) + if tc.debugLogLevel { + assert.Equal(t, zapcore.DebugLevel, le.Entry.Level) + } else { + // Expensive request produce warn-level log if debug-level logs are disabled. + assert.Equal(t, zapcore.WarnLevel, le.Entry.Level) + } + } + }) + } +}