Skip to content

Commit

Permalink
feat(jet/tracing): add otel protocol based tracing middleware (#339)
Browse files Browse the repository at this point in the history
* feat(jet/tracing): add otel protocol based tracing middleware

* feat(jet/tracing): added an otel protocol based tracing middleware

* feat(jet/tracing): added an otel protocol based tracing middleware
  • Loading branch information
flc1125 authored Oct 25, 2024
1 parent 0ad761b commit 46d577e
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 9 deletions.
2 changes: 1 addition & 1 deletion hyperf/jet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *Client) Invoke(ctx context.Context, method string, request any, respons

handler = Chain(append(c.middlewares, middlewares...)...)(handler)

response, err = handler(ctx, c.service, method, request)
response, err = handler(ContextWithClient(ctx, c), c.service, method, request)
return
}

Expand Down
16 changes: 16 additions & 0 deletions hyperf/jet/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package jet

import "context"

type contextClientKey struct{}

// ContextWithClient returns a new Context that carries value.
func ContextWithClient(ctx context.Context, client *Client) context.Context {
return context.WithValue(ctx, contextClientKey{}, client)
}

// ClientFromContext returns the Client value stored in ctx, if any.
func ClientFromContext(ctx context.Context) (*Client, bool) {
client, ok := ctx.Value(contextClientKey{}).(*Client)
return client, ok
}
23 changes: 23 additions & 0 deletions hyperf/jet/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package jet

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestContext_Client(t *testing.T) {
ctx := context.Background()

got1, ok1 := ClientFromContext(ctx)
assert.False(t, ok1)
assert.Nil(t, got1)

client := &Client{}
ctx = ContextWithClient(context.Background(), client)

got2, ok2 := ClientFromContext(ctx)
assert.True(t, ok2)
assert.Equal(t, client, got2)
}
21 changes: 18 additions & 3 deletions hyperf/jet/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
)

type FormatterKind string

const (
FormatterKindJSONRPC FormatterKind = "jsonrpc"
)

var DefaultFormatter Formatter = NewJSONRPCFormatter()

type RPCRequest struct {
Expand Down Expand Up @@ -32,6 +38,8 @@ func (r *RPCResponseError) Error() string {
}

type Formatter interface {
Kind() FormatterKind

// FormatRequest formats a request
FormatRequest(req *RPCRequest) ([]byte, error)

Expand All @@ -47,6 +55,9 @@ type Formatter interface {

// ============================================================

// JSONRPCVersion is the json rpc version
var JSONRPCVersion = "2.0"

// JSONRPCFormatter is a json rpc formatter
type JSONRPCFormatter struct{}

Expand Down Expand Up @@ -74,9 +85,13 @@ func NewJSONRPCFormatter() *JSONRPCFormatter {
return &JSONRPCFormatter{}
}

func (j *JSONRPCFormatter) Kind() FormatterKind {
return FormatterKindJSONRPC
}

func (j *JSONRPCFormatter) FormatRequest(req *RPCRequest) ([]byte, error) {
return json.Marshal(&JSONRPCFormatterRequest{
Jsonrpc: "2.0",
Jsonrpc: JSONRPCVersion,
Method: req.Path,
Params: req.Params,
ID: req.ID,
Expand All @@ -86,7 +101,7 @@ func (j *JSONRPCFormatter) FormatRequest(req *RPCRequest) ([]byte, error) {
func (j *JSONRPCFormatter) FormatResponse(resp *RPCResponse, err *RPCResponseError) ([]byte, error) {
if err != nil {
return json.Marshal(&JSONRPCFormatterResponse{
Jsonrpc: "2.0",
Jsonrpc: JSONRPCVersion,
ID: err.ID,
Error: &JSONRPCFormatterResponseError{
Code: err.Code,
Expand All @@ -96,7 +111,7 @@ func (j *JSONRPCFormatter) FormatResponse(resp *RPCResponse, err *RPCResponseErr
})
}
return json.Marshal(&JSONRPCFormatterResponse{
Jsonrpc: "2.0",
Jsonrpc: JSONRPCVersion,
ID: resp.ID,
Result: resp.Result,
})
Expand Down
125 changes: 125 additions & 0 deletions hyperf/jet/middleware/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package tracing

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"github.com/go-kratos-ecosystem/components/v2/hyperf/jet"
)

const instrumentation = "github.com/go-kratos-ecosystem/components/v2/hyperf/jet/middleware/tracing"

type options struct {
mp propagation.TextMapPropagator
tp trace.TracerProvider
attrs []attribute.KeyValue
}

type Option func(*options)

func WithPropagator(mp propagation.TextMapPropagator) Option {
return func(o *options) {
o.mp = mp
}
}

func WithTracerProvider(tp trace.TracerProvider) Option {
return func(o *options) {
o.tp = tp
}
}

func WithAttributes(attrs ...attribute.KeyValue) Option {
return func(o *options) {
o.attrs = append(o.attrs, attrs...)
}
}

func newOptions(opts ...Option) options {
o := options{
mp: otel.GetTextMapPropagator(),
tp: otel.GetTracerProvider(),
}
for _, opt := range opts {
opt(&o)
}
return o
}

func New(opts ...Option) jet.Middleware {
o := newOptions(opts...)

tracer := o.tp.Tracer(instrumentation)
return func(next jet.Handler) jet.Handler {
return func(ctx context.Context, service, method string, request any) (response any, err error) {
ctx, span := tracer.Start(ctx, service+"/"+method,
trace.WithSpanKind(trace.SpanKindClient),
)
defer span.End()

attrs := []attribute.KeyValue{
semconv.RPCService(service),
semconv.RPCMethod(method),
// semconv.RPCJsonrpcErrorCode(0), // todo
// semconv.RPCJsonrpcErrorMessage(""), // todo
// semconv.RPCJsonrpcRequestID(""), // todo
// semconv.ServerAddress(""), // todo
// semconv.ServerPort(0), // todo
// semconv.NetworkPeerAddress(""), // todo
// semconv.NetworkPeerPort(0), // todo
}
attrs = append(attrs, formatterAttributes(ctx)...)
attrs = append(attrs, transportAttributes(ctx)...)
attrs = append(attrs, o.attrs...)

span.SetAttributes(attrs...)

response, err = next(ctx, service, method, request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return
}
}
}

func formatterAttributes(ctx context.Context) []attribute.KeyValue {
client, ok := jet.ClientFromContext(ctx)
if !ok {
return []attribute.KeyValue{}
}

switch formatter := client.GetFormatter(); formatter.Kind() {
case jet.FormatterKindJSONRPC:
return []attribute.KeyValue{
semconv.RPCSystemKey.String("jsonrpc"),
semconv.RPCJsonrpcVersion(jet.JSONRPCVersion),
}
default:
return []attribute.KeyValue{}
}
}

func transportAttributes(ctx context.Context) []attribute.KeyValue {
client, ok := jet.ClientFromContext(ctx)
if !ok {
return []attribute.KeyValue{}
}

switch transporter := client.GetTransporter().(type) {
case *jet.HTTPTransporter:
return []attribute.KeyValue{
semconv.ServerAddress(transporter.Addr), // todo: split host and port
}
default:
return []attribute.KeyValue{}
}
}
10 changes: 5 additions & 5 deletions hyperf/jet/transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
ErrHTTPTransporterAddrIsRequired = errors.New("jet/transporter: addr is required")
ErrHTTPTransporterAddrIsRequired = errors.New("jet/transporter: Addr is required")
ErrorHTTPTransporterClientIsRequired = errors.New("jet/transporter: client is required")
)

Expand All @@ -24,15 +24,15 @@ type Transporter interface {

// HTTPTransporter is a http transporter
type HTTPTransporter struct {
addr string
Addr string
*http.Client
}

type HTTPTransporterOption func(*HTTPTransporter)

func WithHTTPTransporterAddr(addr string) HTTPTransporterOption {
return func(t *HTTPTransporter) {
t.addr = addr
t.Addr = addr
}
}

Expand All @@ -51,7 +51,7 @@ func NewHTTPTransporter(opts ...HTTPTransporterOption) (*HTTPTransporter, error)
}

// validate
if transport.addr == "" {
if transport.Addr == "" {
return nil, ErrHTTPTransporterAddrIsRequired
}
if transport.Client == nil {
Expand All @@ -62,7 +62,7 @@ func NewHTTPTransporter(opts ...HTTPTransporterOption) (*HTTPTransporter, error)
}

func (t *HTTPTransporter) Send(ctx context.Context, data []byte) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodPost, t.addr, bytes.NewReader(data))
request, err := http.NewRequestWithContext(ctx, http.MethodPost, t.Addr, bytes.NewReader(data))
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions hyperf/jet/transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ func TestTransporter_HTTPTransporter_HTTPTransporterServerError(t *testing.T) {
Err: errors.New("custom error"),
}
assert.True(t, IsHTTPTransporterServerError(err))
assert.IsType(t, "", err.Error())
assert.Equal(t, "custom error", err.Unwrap().Error())
}

0 comments on commit 46d577e

Please sign in to comment.