From f505f60df1bcaa3f9401430ba3b38866ff1c96e4 Mon Sep 17 00:00:00 2001 From: smantic Date: Sat, 2 Nov 2024 17:02:35 -0500 Subject: [PATCH] Speak/Learn/MessagesUsed Metrics (#85) * metrics: - refactor metrics from global vars - add learn and speak latency histograms. * add messages used histogram * remove duplicated metrics config Fixes #83. --- api.go | 36 ++----------------- command/command.go | 2 ++ command/talk.go | 2 ++ main.go | 84 ++++++++++++++++++++++++++++++++++++++++++- metrics/metrics.go | 33 +++++++++++++++++ metrics/prometheus.go | 51 ++++++++++++++++++++++++++ privmsg.go | 9 +++-- robot.go | 4 +++ tmi.go | 11 +++--- 9 files changed, 189 insertions(+), 43 deletions(-) create mode 100644 metrics/metrics.go create mode 100644 metrics/prometheus.go diff --git a/api.go b/api.go index 82ed469..75d3eec 100644 --- a/api.go +++ b/api.go @@ -12,39 +12,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) -// Userspace metrics. -var ( - tmiMsgsCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "robot", - Subsystem: "tmi", - Name: "messages", - Help: "Number of PRIVMSGs received from TMI.", - }) - tmiCommandsCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "robot", - Subsystem: "tmi", - Name: "commands", - Help: "Number of command invocations received in Twitch chat.", - }) - learnedCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "robot", - Subsystem: "brain", - Name: "learned", - Help: "Number of messages learned.", - }) - forgortCount = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "robot", - Subsystem: "brain", - Name: "forgot", - Help: "Number of individual messages deleted. Does not include messages deleted by user or time.", - }) -) - -func api(ctx context.Context, listen string, mux *http.ServeMux) error { +func api(ctx context.Context, listen string, mux *http.ServeMux, metrics []prometheus.Collector) error { reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewGoCollector( collectors.WithGoCollectorMemStatsMetricsDisabled(), @@ -54,10 +25,7 @@ func api(ctx context.Context, listen string, mux *http.ServeMux) error { }, ), )) - reg.MustRegister(tmiMsgsCount) - reg.MustRegister(tmiCommandsCount) - reg.MustRegister(learnedCount) - reg.MustRegister(forgortCount) + reg.MustRegister(metrics...) opts := promhttp.HandlerOpts{ EnableOpenMetrics: true, } diff --git a/command/command.go b/command/command.go index 7b27fda..6b9a74b 100644 --- a/command/command.go +++ b/command/command.go @@ -7,6 +7,7 @@ import ( "github.com/zephyrtronium/robot/brain" "github.com/zephyrtronium/robot/channel" "github.com/zephyrtronium/robot/message" + "github.com/zephyrtronium/robot/metrics" "github.com/zephyrtronium/robot/privacy" "github.com/zephyrtronium/robot/spoken" "github.com/zephyrtronium/robot/syncmap" @@ -21,6 +22,7 @@ type Robot struct { Spoken *spoken.History Owner string Contact string + Metrics *metrics.Metrics } // Invocation is a command invocation. An Invocation and its fields must not diff --git a/command/talk.go b/command/talk.go index 7a84626..dc53fbc 100644 --- a/command/talk.go +++ b/command/talk.go @@ -61,6 +61,8 @@ func speakCmd(ctx context.Context, robo *Robot, call *Invocation, effect string) } // block the generated message from being later recognized as a meme. call.Channel.Memery.Block(call.Message.Time(), s) + robo.Metrics.SpeakLatency.Observe(time.Since(start).Seconds(), call.Channel.Send, fmt.Sprintf("%t", len(call.Args["prompt"]) == 0)) + robo.Metrics.UsedMessagesForGeneration.Observe(float64(len(trace))) robo.Log.InfoContext(ctx, "speak", "in", call.Channel.Name, "text", m, "emote", e) return m + " " + e } diff --git a/main.go b/main.go index c4c2822..61c3b5f 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli/v3" "golang.org/x/sync/errgroup" "zombiezen.com/go/sqlite/sqlitex" @@ -19,6 +20,7 @@ import ( "github.com/zephyrtronium/robot/brain" "github.com/zephyrtronium/robot/brain/kvbrain" "github.com/zephyrtronium/robot/brain/sqlbrain" + "github.com/zephyrtronium/robot/metrics" "github.com/zephyrtronium/robot/userhash" ) @@ -135,7 +137,7 @@ func cliRun(ctx context.Context, cmd *cli.Command) error { if cfg.HTTP.Listen != "" { // TODO(zeph): this should be in the errgroup inside Run - go api(ctx, cfg.HTTP.Listen, new(http.ServeMux)) + go api(ctx, cfg.HTTP.Listen, new(http.ServeMux), robo.Metrics.Collectors()) } return robo.Run(ctx) @@ -315,3 +317,83 @@ func loggerFromFlags(cmd *cli.Command) *slog.Logger { } return slog.New(h) } + +// metrics configuration +func newMetrics() *metrics.Metrics { + return &metrics.Metrics{ + TMIMsgsCount: metrics.NewPromCounter( + prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "robot", + Subsystem: "tmi", + Name: "messages", + Help: "Number of PRIVMSGs received from TMI.", + }, + ), + ), + TMICommandCount: metrics.NewPromCounter( + prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "robot", + Subsystem: "tmi", + Name: "commands", + Help: "Number of command invocations received in Twitch chat.", + }, + ), + ), + LearnedCount: metrics.NewPromCounter( + prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "robot", + Subsystem: "brain", + Name: "learned", + Help: "Number of messages learned.", + }, + ), + ), + ForgotCount: metrics.NewPromCounter( + prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "robot", + Subsystem: "brain", + Name: "forgot", + Help: "Number of individual messages deleted. Does not include messages deleted by user or time.", + }, + ), + ), + SpeakLatency: metrics.NewPromObserverVec( + prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.5, 1, 5, 10}, + Namespace: "robot", + Subsystem: "commands", + Name: "speak-latency", + Help: "How long it takes for robot to speak once prompted in seconds", + }, + []string{"channel", "empty-prompt"}, + ), + ), + LearnLatency: metrics.NewPromObserverVec( + prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Buckets: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + Namespace: "robot", + Subsystem: "brain", + Name: "learn-latency", + Help: "How long it takes robot to learn a non discarded message in seconds", + }, + []string{"channel"}, + ), + ), + UsedMessagesForGeneration: metrics.NewPromHistogram( + prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "robot", + Subsystem: "commands", + Name: "used-messages", + Help: "How many messages were used while generating a new message", + }, + ), + ), + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..6b620ab --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,33 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +type Observer interface { + Observe(val float64, labels ...string) + + // for now we will tightly couple to the prometheus collector type + // the go otel metrics sdk also has a prometheus adapter that implements this interface. + prometheus.Collector +} + +type Metrics struct { + TMIMsgsCount Observer + TMICommandCount Observer + LearnedCount Observer + ForgotCount Observer + SpeakLatency Observer + LearnLatency Observer + UsedMessagesForGeneration Observer +} + +func (m Metrics) Collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.ForgotCount, + m.LearnedCount, + m.SpeakLatency, + m.TMICommandCount, + m.TMIMsgsCount, + m.LearnLatency, + m.UsedMessagesForGeneration, + } +} diff --git a/metrics/prometheus.go b/metrics/prometheus.go new file mode 100644 index 0000000..ba2205f --- /dev/null +++ b/metrics/prometheus.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +func NewPromCounter(m prometheus.Counter) Observer { + return &PrometheusMetric{ + observe: func(val float64, labels ...string) { + m.Add(val) + }, + Collector: m, + } +} + +func NewPromGauge(m prometheus.Counter) Observer { + return &PrometheusMetric{ + observe: func(val float64, labels ...string) { + m.Add(val) + }, + Collector: m, + } +} + +// for histogram or summary vecs +func NewPromObserverVec(m prometheus.ObserverVec) Observer { + return &PrometheusMetric{ + observe: func(val float64, labels ...string) { + m.WithLabelValues(labels...).Observe(val) + }, + Collector: m, + } +} + +func NewPromHistogram(m prometheus.Histogram) Observer { + return &PrometheusMetric{ + observe: func(val float64, labels ...string) { + m.Observe(val) + }, + Collector: m, + } +} + +type PrometheusMetric struct { + observe func(val float64, labels ...string) + prometheus.Collector +} + +func (m *PrometheusMetric) Observe(val float64, labels ...string) { + m.observe(val, labels...) +} diff --git a/privmsg.go b/privmsg.go index 3ee8744..f917fd1 100644 --- a/privmsg.go +++ b/privmsg.go @@ -23,7 +23,7 @@ import ( // tmiMessage processes a PRIVMSG from TMI. func (robo *Robot) tmiMessage(ctx context.Context, group *errgroup.Group, send chan<- *tmi.Message, msg *tmi.Message) { - tmiMsgsCount.Inc() + robo.Metrics.TMIMsgsCount.Observe(1) // Run in a worker so that we don't block the message loop. work := func(ctx context.Context) { ch, _ := robo.channels.Load(msg.To()) @@ -146,7 +146,7 @@ func (robo *Robot) tmiMessage(ctx context.Context, group *errgroup.Group, send c } func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Channel, m *message.Received, from, cmd string) { - tmiCommandsCount.Inc() + robo.Metrics.TMICommandCount.Observe(1) var c *twitchCommand var args map[string]string level := "any" @@ -184,6 +184,7 @@ func (robo *Robot) command(ctx context.Context, log *slog.Logger, ch *channel.Ch Spoken: robo.spoken, Owner: robo.owner, Contact: robo.ownerContact, + Metrics: robo.Metrics, } inv := command.Invocation{ Channel: ch, @@ -256,11 +257,13 @@ func (robo *Robot) learn(ctx context.Context, log *slog.Logger, ch *channel.Chan return } user := hasher.Hash(new(userhash.Hash), msg.Sender, msg.To, msg.Time()) + start := time.Now() if err := brain.Learn(ctx, robo.brain, ch.Learn, msg.ID, *user, msg.Time(), brain.Tokens(nil, msg.Text)); err != nil { log.ErrorContext(ctx, "failed to learn", slog.Any("err", err)) return } - learnedCount.Inc() + robo.Metrics.LearnLatency.Observe(time.Since(start).Seconds(), ch.Learn) + robo.Metrics.LearnedCount.Observe(1) } // sendTMI sends a message to TMI after waiting for the global rate limit. diff --git a/robot.go b/robot.go index 6bed98f..60c2dcf 100644 --- a/robot.go +++ b/robot.go @@ -16,6 +16,7 @@ import ( "github.com/zephyrtronium/robot/auth" "github.com/zephyrtronium/robot/brain" "github.com/zephyrtronium/robot/channel" + "github.com/zephyrtronium/robot/metrics" "github.com/zephyrtronium/robot/privacy" "github.com/zephyrtronium/robot/spoken" "github.com/zephyrtronium/robot/syncmap" @@ -46,6 +47,8 @@ type Robot struct { tmi *client[*tmi.Message, *tmi.Message] // twitch is the Twitch API client. twitch twitch.Client + // Metrics are a collection of custom domain specific Metrics. + Metrics *metrics.Metrics } // client is the settings for OAuth2 and related elements. @@ -75,6 +78,7 @@ func New(usersKey []byte, poolSize int) *Robot { channels: syncmap.New[string, *channel.Channel](), works: make(chan chan func(context.Context), poolSize), hashes: func() userhash.Hasher { return userhash.New(usersKey) }, + Metrics: newMetrics(), } } diff --git a/tmi.go b/tmi.go index c7b795f..f38d575 100644 --- a/tmi.go +++ b/tmi.go @@ -10,6 +10,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/zephyrtronium/robot/brain" + "github.com/zephyrtronium/robot/metrics" "github.com/zephyrtronium/robot/userhash" ) @@ -111,7 +112,7 @@ func (robo *Robot) clearchat(ctx context.Context, group *errgroup.Group, msg *tm ) continue } - forgortCount.Inc() + robo.Metrics.ForgotCount.Observe(1) if err := robo.brain.ForgetMessage(ctx, tag, id); err != nil { slog.ErrorContext(ctx, "failed to forget from recent trace", slog.Any("err", err), @@ -157,7 +158,7 @@ func (robo *Robot) clearmsg(ctx context.Context, group *errgroup.Group, msg *tmi if u != robo.tmi.name { // Forget a message from someone else. log.InfoContext(ctx, "forget message", slog.String("tag", ch.Learn), slog.String("id", t)) - forget(ctx, log, robo.brain, ch.Learn, t) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Learn, t) return } // Forget a message from the robo. @@ -177,13 +178,13 @@ func (robo *Robot) clearmsg(ctx context.Context, group *errgroup.Group, msg *tmi return } log.InfoContext(ctx, "forget trace", slog.String("tag", ch.Send), slog.Any("spoken", tm), slog.Any("trace", trace)) - forget(ctx, log, robo.brain, ch.Send, trace...) + forget(ctx, log, robo.Metrics.ForgotCount, robo.brain, ch.Send, trace...) } robo.enqueue(ctx, group, work) } -func forget(ctx context.Context, log *slog.Logger, brain brain.Brain, tag string, trace ...string) { - forgortCount.Add(float64(len(trace))) +func forget(ctx context.Context, log *slog.Logger, forgetCount metrics.Observer, brain brain.Brain, tag string, trace ...string) { + forgetCount.Observe(1) for _, id := range trace { err := brain.ForgetMessage(ctx, tag, id) if err != nil {