Skip to content

Commit a4a1e53

Browse files
authored
[RSDK 9149] Use RDK Logger Across Interceptors (#405)
1 parent d2f65e2 commit a4a1e53

File tree

4 files changed

+96
-47
lines changed

4 files changed

+96
-47
lines changed

logger.go

+36
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package utils
22

33
import (
44
"reflect"
5+
"time"
56

67
"github.com/edaniels/golog"
8+
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
79
"go.uber.org/zap"
10+
"go.uber.org/zap/zapcore"
11+
"google.golang.org/grpc/codes"
812
)
913

1014
// Logger is used various parts of the package for informational/debugging purposes.
@@ -127,3 +131,35 @@ func AddFieldsToLogger(inp ZapCompatibleLogger, args ...interface{}) (loggerRet
127131

128132
return loggerRet
129133
}
134+
135+
// LogFinalLine is used to log the final status of a gRPC request along with its execution time, an associated error (if any), and the
136+
// gRPC status code. If there is an error, the log level is upgraded (if necessary) to ERROR. Otherwise, it is set to DEBUG. This code is
137+
// taken from
138+
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
139+
func LogFinalLine(logger ZapCompatibleLogger, startTime time.Time, err error, msg string, code codes.Code) {
140+
level := grpc_zap.DefaultCodeToLevel(code)
141+
142+
// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
143+
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
144+
fields := []any{}
145+
if err == nil {
146+
level = zap.DebugLevel
147+
} else {
148+
if level < zap.ErrorLevel {
149+
level = zap.ErrorLevel
150+
}
151+
fields = append(fields, "error", err)
152+
}
153+
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
154+
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
155+
switch level {
156+
case zap.DebugLevel:
157+
logger.Debugw(msg, fields...)
158+
case zap.InfoLevel:
159+
logger.Infow(msg, fields...)
160+
case zap.ErrorLevel:
161+
logger.Errorw(msg, fields...)
162+
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
163+
logger.Warnw(msg, fields...)
164+
}
165+
}

rpc/server.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@ import (
2020
"github.com/golang-jwt/jwt/v4"
2121
"github.com/google/uuid"
2222
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
23-
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
2423
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
2524
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
2625
"github.com/improbable-eng/grpc-web/go/grpcweb"
2726
"github.com/pkg/errors"
2827
"go.uber.org/multierr"
29-
"go.uber.org/zap"
30-
"go.uber.org/zap/zapcore"
3128
"golang.org/x/net/http2/h2c"
3229
"google.golang.org/grpc"
3330
"google.golang.org/grpc/codes"
@@ -327,10 +324,6 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
327324
logger: logger,
328325
}
329326

330-
grpcLogger := logger.Desugar()
331-
if !(sOpts.debug || utils.Debug) {
332-
grpcLogger = grpcLogger.WithOptions(zap.IncreaseLevel(zap.LevelEnablerFunc(zapcore.ErrorLevel.Enabled)))
333-
}
334327
if sOpts.unknownStreamDesc != nil {
335328
serverOpts = append(serverOpts, grpc.UnknownServiceHandler(sOpts.unknownStreamDesc.Handler))
336329
}
@@ -342,10 +335,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
342335
logger.Errorw("panicked while calling unary server method", "error", errors.WithStack(err))
343336
return err
344337
}))),
345-
grpc_zap.UnaryServerInterceptor(grpcLogger),
338+
grpcUnaryServerInterceptor(logger),
346339
unaryServerCodeInterceptor(),
347340
)
348-
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor(grpcLogger))
341+
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor())
349342
unaryAuthIntPos := -1
350343
if !sOpts.unauthenticated {
351344
unaryInterceptors = append(unaryInterceptors, server.authUnaryInterceptor)
@@ -375,10 +368,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
375368
logger.Errorw("panicked while calling stream server method", "error", errors.WithStack(err))
376369
return err
377370
}))),
378-
grpc_zap.StreamServerInterceptor(grpcLogger),
371+
grpcStreamServerInterceptor(logger),
379372
streamServerCodeInterceptor(),
380373
)
381-
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor(grpcLogger))
374+
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor())
382375
streamAuthIntPos := -1
383376
if !sOpts.unauthenticated {
384377
streamInterceptors = append(streamInterceptors, server.authStreamInterceptor)
@@ -861,7 +854,7 @@ func (ss *simpleServer) Stop() error {
861854
err = multierr.Combine(err, ss.signalingCallQueue.Close())
862855
}
863856
ss.logger.Debug("stopping gRPC server")
864-
defer ss.grpcServer.Stop()
857+
defer ss.grpcServer.GracefulStop()
865858
ss.logger.Debug("canceling service servers for gateway")
866859
for _, cancel := range ss.serviceServerCancels {
867860
cancel()

rpc/server_interceptors.go

+51-2
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,23 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7+
"path"
78
"strconv"
9+
"time"
810

11+
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
912
"github.com/pkg/errors"
1013
"go.opencensus.io/trace"
1114
"go.uber.org/zap"
1215
"google.golang.org/grpc"
1316
"google.golang.org/grpc/metadata"
1417
"google.golang.org/grpc/status"
18+
19+
"go.viam.com/utils"
1520
)
1621

1722
// UnaryServerTracingInterceptor starts a new Span if Span metadata exists in the context.
18-
func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor {
23+
func UnaryServerTracingInterceptor() grpc.UnaryServerInterceptor {
1924
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
2025
if remoteSpanContext, err := remoteSpanContextFromContext(ctx); err == nil {
2126
var span *trace.Span
@@ -38,7 +43,7 @@ func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerIntercept
3843
}
3944

4045
// StreamServerTracingInterceptor starts a new Span if Span metadata exists in the context.
41-
func StreamServerTracingInterceptor(logger *zap.Logger) grpc.StreamServerInterceptor {
46+
func StreamServerTracingInterceptor() grpc.StreamServerInterceptor {
4247
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
4348
if remoteSpanContext, err := remoteSpanContextFromContext(stream.Context()); err == nil {
4449
newCtx, span := trace.StartSpanWithRemoteParent(stream.Context(), "server_root", remoteSpanContext)
@@ -119,3 +124,47 @@ func remoteSpanContextFromContext(ctx context.Context) (trace.SpanContext, error
119124

120125
return trace.SpanContext{TraceID: traceID, SpanID: spanID, TraceOptions: traceOptions, Tracestate: nil}, nil
121126
}
127+
128+
func grpcUnaryServerInterceptor(logger utils.ZapCompatibleLogger) grpc.UnaryServerInterceptor {
129+
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
130+
startTime := time.Now()
131+
resp, err := handler(ctx, req)
132+
code := grpc_logging.DefaultErrorToCode(err)
133+
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(ctx, info.FullMethod, startTime)...)
134+
135+
utils.LogFinalLine(loggerWithFields, startTime, err, "finished unary call with code "+code.String(), code)
136+
137+
return resp, err
138+
}
139+
}
140+
141+
func grpcStreamServerInterceptor(logger utils.ZapCompatibleLogger) grpc.StreamServerInterceptor {
142+
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
143+
startTime := time.Now()
144+
err := handler(srv, stream)
145+
code := grpc_logging.DefaultErrorToCode(err)
146+
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(stream.Context(), info.FullMethod, startTime)...)
147+
148+
utils.LogFinalLine(loggerWithFields, startTime, err, "finished stream call with code "+code.String(), code)
149+
150+
return err
151+
}
152+
}
153+
154+
const iso8601 = "2006-01-02T15:04:05.000Z0700" // keep timestamp formatting constant
155+
156+
func serverCallFields(ctx context.Context, fullMethodString string, start time.Time) []any {
157+
var f []any
158+
f = append(f, "grpc.start_time", start.UTC().Format(iso8601))
159+
if d, ok := ctx.Deadline(); ok {
160+
f = append(f, zap.String("grpc.request.deadline", d.UTC().Format(iso8601)))
161+
}
162+
service := path.Dir(fullMethodString)[1:]
163+
method := path.Base(fullMethodString)
164+
return append(f, []any{
165+
"span.kind", "server",
166+
"system", "grpc",
167+
"grpc.service", service,
168+
"grpc.method", method,
169+
})
170+
}

rpc/wrtc_client_channel.go

+4-33
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ import (
99
"time"
1010

1111
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
12-
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
1312
"github.com/viamrobotics/webrtc/v3"
14-
"go.uber.org/zap"
15-
"go.uber.org/zap/zapcore"
1613
"google.golang.org/grpc"
1714
"google.golang.org/grpc/metadata"
1815
"google.golang.org/protobuf/proto"
@@ -110,8 +107,9 @@ func (ch *webrtcClientChannel) Invoke(
110107
) error {
111108
startTime := time.Now()
112109
err := ch.invokeWithInterceptor(ctx, method, args, reply, opts...)
110+
code := grpc_logging.DefaultErrorToCode(err)
113111
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
114-
logFinalClientLine(loggerWithFields, startTime, err, "finished client unary call")
112+
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client unary call", code)
115113
return err
116114
}
117115

@@ -197,8 +195,9 @@ func (ch *webrtcClientChannel) NewStream(
197195
) (grpc.ClientStream, error) {
198196
startTime := time.Now()
199197
clientStream, err := ch.streamWithInterceptor(ctx, method)
198+
code := grpc_logging.DefaultErrorToCode(err)
200199
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
201-
logFinalClientLine(loggerWithFields, startTime, err, "finished client streaming call")
200+
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client streaming call", code)
202201
return clientStream, err
203202
}
204203

@@ -341,34 +340,6 @@ func (ch *webrtcClientChannel) writeReset(stream *webrtcpb.Stream) error {
341340
})
342341
}
343342

344-
// taken from
345-
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
346-
func logFinalClientLine(logger utils.ZapCompatibleLogger, startTime time.Time, err error, msg string) {
347-
code := grpc_logging.DefaultErrorToCode(err)
348-
level := grpc_zap.DefaultCodeToLevel(code)
349-
350-
// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
351-
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
352-
fields := []any{}
353-
if err == nil {
354-
level = zap.DebugLevel
355-
} else {
356-
fields = append(fields, "error", err)
357-
}
358-
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
359-
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
360-
switch level {
361-
case zap.DebugLevel:
362-
logger.Debugw(msg, fields...)
363-
case zap.InfoLevel:
364-
logger.Infow(msg, fields...)
365-
case zap.ErrorLevel:
366-
logger.Errorw(msg, fields...)
367-
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
368-
logger.Warnw(msg, fields...)
369-
}
370-
}
371-
372343
func newClientLoggerFields(fullMethodString string) []any {
373344
service := path.Dir(fullMethodString)[1:]
374345
method := path.Base(fullMethodString)

0 commit comments

Comments
 (0)