Skip to content

Commit 2a959a3

Browse files
authored
feat(crontab/middleware): add support for otel protocol tracing (#346)
* feat(crontab/middleware): add support for otel protocol tracing * feat(crontab/middleware): add support for otel protocol tracing * feat(crontab/middleware): add support for otel protocol tracing
1 parent 38d1588 commit 2a959a3

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed

crontab/middleware/tracing/tracing.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package tracing
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/flc1125/go-cron/v4"
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/codes"
11+
"go.opentelemetry.io/otel/trace"
12+
)
13+
14+
var scopeName = "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/otel"
15+
16+
var (
17+
attrJobName = attribute.Key("cron.job.name")
18+
attrJobID = attribute.Key("cron.job.id")
19+
attrJobPrevTime = attribute.Key("cron.job.prev.time")
20+
attrJobNextTime = attribute.Key("cron.job.next.time")
21+
attrJobDuration = attribute.Key("cron.job.duration")
22+
)
23+
24+
type JobWithName interface {
25+
cron.Job
26+
Name() string
27+
}
28+
29+
type options struct {
30+
tp trace.TracerProvider
31+
}
32+
33+
type Option func(*options)
34+
35+
func newOptions(opts ...Option) *options {
36+
opt := &options{
37+
tp: otel.GetTracerProvider(),
38+
}
39+
for _, o := range opts {
40+
o(opt)
41+
}
42+
return opt
43+
}
44+
45+
func New(opts ...Option) cron.Middleware {
46+
o := newOptions(opts...)
47+
48+
tracer := o.tp.Tracer(scopeName)
49+
return func(original cron.Job) cron.Job {
50+
return cron.JobFunc(func(ctx context.Context) (err error) {
51+
job, ok := any(original).(JobWithName)
52+
if !ok {
53+
return original.Run(ctx)
54+
}
55+
56+
// The span is created here, and it will be ended when the job is done.
57+
var span trace.Span
58+
ctx, span = tracer.Start(ctx, job.Name(),
59+
trace.WithSpanKind(trace.SpanKindInternal),
60+
)
61+
defer span.End()
62+
defer func(starting time.Time) {
63+
span.SetAttributes(
64+
attrJobDuration.String(time.Since(starting).String()),
65+
)
66+
67+
if err != nil {
68+
span.RecordError(err)
69+
span.SetStatus(codes.Error, err.Error())
70+
} else {
71+
span.SetStatus(codes.Ok, "")
72+
}
73+
}(time.Now())
74+
75+
// Set attributes.
76+
span.SetAttributes(append(
77+
[]attribute.KeyValue{
78+
attrJobName.String(job.Name()),
79+
},
80+
entryAttributes(ctx)...,
81+
)...)
82+
83+
// The job is run here.
84+
err = job.Run(ctx)
85+
return
86+
})
87+
}
88+
}
89+
90+
func entryAttributes(ctx context.Context) []attribute.KeyValue {
91+
entry, ok := cron.EntryFromContext(ctx)
92+
if !ok {
93+
return []attribute.KeyValue{}
94+
}
95+
96+
return []attribute.KeyValue{
97+
attrJobID.Int(int(entry.ID())),
98+
attrJobPrevTime.String(entry.Prev().String()),
99+
attrJobNextTime.String(entry.Next().String()),
100+
}
101+
}

0 commit comments

Comments
 (0)