Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exporting JS metadata #318

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,16 @@ func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) p
return newLeafzCollector(getSystem(system, prefix), endpoint, servers)
}
if isJszEndpoint(system) {
return newJszCollector(getSystem(system, prefix), endpoint, servers)
return newJszCollector(getSystem(system, prefix), endpoint, servers, []string{}, []string{})
}
return newNatsCollector(getSystem(system, prefix), endpoint, servers)
}

// NewJszCollector creates a new NATS JetStream Collector.
func NewJszCollector(
endpoint, prefix string,
servers []*CollectedServer,
streamMetaKeys, consumerMetaKeys []string,
) prometheus.Collector {
return newJszCollector(getSystem(JetStreamSystem, prefix), endpoint, servers, streamMetaKeys, consumerMetaKeys)
}
Comment on lines +494 to +501
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a separate code path now for JetStream collectors to be able to supply the meta keys. This seemed better than to pass the arguments around for the other collectors where they wouldn't be used.

This makes JS collectors somewhat of a special case in the code, not sure if you like it that way.

138 changes: 129 additions & 9 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,22 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
verifySpecificCollector(cases, coll, t)
}

func verifyJszCollector(url string, endpoint string, cases map[string]float64, t *testing.T) {
// create a new collector.
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewJszCollector(endpoint, "", servers, []string{}, []string{})

verifySpecificCollector(cases, coll, t)
}

// now collect the metrics
func verifySpecificCollector(cases map[string]float64, coll prometheus.Collector, t *testing.T) {
c := make(chan prometheus.Metric)
go coll.Collect(c)
for {
Expand Down Expand Up @@ -72,6 +86,34 @@ func verifyCollector(system, url string, endpoint string, cases map[string]float
// To account for the metrics that share the same descriptor but differ in their variable label values,
// return a list of lists of label pairs for each of the supplied metric names.
func getLabelValues(system, url, endpoint string, metricNames []string) (map[string][]map[string]string, error) {
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
return getLabelValuesFromCollector(metricNames, coll)
}

// To account for the metrics that share the same descriptor but differ in their variable label values,
// return a list of lists of label pairs for each of the supplied metric names.
func getJszLabelValues(
url, endpoint string,
streamMetaKeys, consumerMetaKeys, metricNames []string,
) (map[string][]map[string]string, error) {
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewJszCollector(endpoint, "", servers, streamMetaKeys, consumerMetaKeys)
return getLabelValuesFromCollector(metricNames, coll)
}

func getLabelValuesFromCollector(
metricNames []string,
coll prometheus.Collector,
) (map[string][]map[string]string, error) {
labelValues := make(map[string][]map[string]string)
namesMap := make(map[string]bool)
for _, metricName := range metricNames {
Expand Down Expand Up @@ -113,13 +155,7 @@ func getLabelValues(system, url, endpoint string, metricNames []string) (map[str
}
}()

// create a new collector and collect
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewCollector(system, endpoint, "", servers)
// collect metrics
coll.Collect(metrics)
close(metrics)

Expand Down Expand Up @@ -456,7 +492,91 @@ func TestJetStreamMetrics(t *testing.T) {
"jetstream_server_total_streams": 1,
"jetstream_server_total_consumers": 1,
}
verifyCollector(JetStreamSystem, url, "jsz", cases, t)
verifyJszCollector(url, "all", cases, t)
}

func TestJetStreamMetricLabels(t *testing.T) {
clientPort := 4229
monitorPort := 8229
s, err := pet.RunJetStreamServerWithPorts(clientPort, monitorPort, "ABC")
if err != nil {
t.Fatal(err)
}
defer func() {
os.RemoveAll(s.StoreDir())
s.Shutdown()
}()

url := fmt.Sprintf("http://127.0.0.1:%d/", monitorPort)
nc, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", clientPort))
if err != nil {
t.Fatal(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}

streamName := "myStr"
streamK := "streamFoo"
streamV := "bar"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Metadata: map[string]string{streamK: streamV},
})
if err != nil {
t.Fatal(err)
}

consumerName := "myCon"
consumerK := "consFoo"
consumerV := "baz"
consumerConfig := nats.ConsumerConfig{Name: consumerName, Metadata: map[string]string{consumerK: consumerV}}
_, err = js.AddConsumer(streamName, &consumerConfig)
if err != nil {
t.Fatal(err)
}

// expected label keys
streamLabelKey := "stream_meta_" + streamK
consumerLabelKey := "consumer_meta_" + consumerK

streamMetric := "jetstream_stream_total_bytes"
consumerMetric := "jetstream_consumer_num_ack_pending"
labelValues, err := getJszLabelValues(
url,
"all",
[]string{streamK},
[]string{consumerK},
[]string{streamMetric, consumerMetric},
)
if err != nil {
t.Fatalf("Unexpected error getting labels for %s metrics: %v", consumerMetric, err)
}

streamMaps, found := labelValues[streamMetric]
if !found || len(streamMaps) != 1 {
t.Fatalf("No info found for metric: %v", streamMetric)
}
streamLabels := streamMaps[0]
if streamLabels[streamLabelKey] != streamV {
t.Fatalf("Value of stream label %s has unexpected value \"%s\"", streamLabelKey, streamLabels[streamLabelKey])
}

consumerMaps, found := labelValues[consumerMetric]
if !found || len(consumerMaps) != 1 {
t.Fatalf("No info found for metric: %v", consumerMetric)
}
consumerLabels := consumerMaps[0]

if consumerLabels[streamLabelKey] != streamV {
t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", streamLabelKey, consumerLabels[streamLabelKey])
}
if consumerLabels[consumerLabelKey] != consumerV {
t.Fatalf("Value of consumer label %s has unexpected value \"%s\"", consumerLabelKey, consumerLabels[consumerLabelKey])
}
}

func TestMapKeys(t *testing.T) {
Expand Down
70 changes: 56 additions & 14 deletions collector/jsz.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,21 @@ type jszCollector struct {
consumerNumPending *prometheus.Desc
consumerAckFloorStreamSeq *prometheus.Desc
consumerAckFloorConsumerSeq *prometheus.Desc

// metadata extractors
streamMetricExtractors []func(nats.StreamDetail) string
consumerMetricExtractors []func(*nats.ConsumerInfo) string
}

func isJszEndpoint(system string) bool {
return system == JetStreamSystem
}

func newJszCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
func newJszCollector(
system, endpoint string,
servers []*CollectedServer,
streamMetaKeys, consumerMetaKeys []string,
) prometheus.Collector {
serverLabels := []string{"server_id", "server_name", "cluster", "domain", "meta_leader", "is_meta_leader"}

var streamLabels []string
Expand All @@ -74,13 +82,37 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet
streamLabels = append(streamLabels, "stream_leader")
streamLabels = append(streamLabels, "is_stream_leader")
streamLabels = append(streamLabels, "stream_raft_group")
for _, k := range streamMetaKeys {
streamLabels = append(streamLabels, "stream_meta_"+k)
}
Comment on lines +85 to +87
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream metadata is exported as labels named stream_meta_…, making sure they don't conflict with consumer metadata defined below.

Maybe it would actually be nice to combine these, WDYT?

var streamMetricExtractors = make([]func(nats.StreamDetail) string, len(streamMetaKeys))
for i, k := range streamMetaKeys {
streamMetricExtractors[i] = func(s nats.StreamDetail) string {
if s.Config == nil {
return ""
}
return s.Config.Metadata[k] // defaults to empty string
}
}

var consumerLabels []string
consumerLabels = append(consumerLabels, streamLabels...)
consumerLabels = append(consumerLabels, "consumer_name")
consumerLabels = append(consumerLabels, "consumer_leader")
consumerLabels = append(consumerLabels, "is_consumer_leader")
consumerLabels = append(consumerLabels, "consumer_desc")
for _, k := range consumerMetaKeys {
consumerLabels = append(consumerLabels, "consumer_meta_"+k)
}
var consumerMetricExtractors = make([]func(*nats.ConsumerInfo) string, len(consumerMetaKeys))
for i, k := range consumerMetaKeys {
consumerMetricExtractors[i] = func(c *nats.ConsumerInfo) string {
if c == nil || c.Config == nil {
return ""
}
return c.Config.Metadata[k] // defaults to empty string
}
}

nc := &jszCollector{
httpClient: &http.Client{
Expand Down Expand Up @@ -239,6 +271,8 @@ func newJszCollector(system, endpoint string, servers []*CollectedServer) promet
consumerLabels,
nil,
),
streamMetricExtractors: streamMetricExtractors,
consumerMetricExtractors: consumerMetricExtractors,
}

// Use the endpoint
Expand Down Expand Up @@ -364,12 +398,18 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) {
}
streamRaftGroup = stream.RaftGroup

streamLabelValues := []string{
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup,
}
for _, extractor := range nc.streamMetricExtractors {
value := extractor(stream)
streamLabelValues = append(streamLabelValues, value)
}
streamMetric := func(key *prometheus.Desc, value float64) prometheus.Metric {
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value,
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup)
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, streamLabelValues...)
}
ch <- streamMetric(nc.streamMessages, float64(stream.State.Msgs))
ch <- streamMetric(nc.streamBytes, float64(stream.State.Bytes))
Expand Down Expand Up @@ -398,15 +438,17 @@ func (nc *jszCollector) Collect(ch chan<- prometheus.Metric) {
} else {
isConsumerLeader = "true"
}
consumerLabelValues := streamLabelValues
consumerLabelValues = append(consumerLabelValues,
// Consumer Labels
consumerName, consumerLeader, isConsumerLeader, consumerDesc,
)
for _, extractor := range nc.consumerMetricExtractors {
value := extractor(consumer)
consumerLabelValues = append(consumerLabelValues, value)
}
consumerMetric := func(key *prometheus.Desc, value float64) prometheus.Metric {
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value,
// Server Labels
serverID, serverName, clusterName, jsDomain, clusterLeader, isMetaLeader,
// Stream Labels
accountName, accountID, streamName, streamLeader, isStreamLeader, streamRaftGroup,
// Consumer Labels
consumerName, consumerLeader, isConsumerLeader, consumerDesc,
)
return prometheus.MustNewConstMetric(key, prometheus.GaugeValue, value, consumerLabelValues...)
}
ch <- consumerMetric(nc.consumerDeliveredConsumerSeq, float64(consumer.Delivered.Consumer))
ch <- consumerMetric(nc.consumerDeliveredStreamSeq, float64(consumer.Delivered.Stream))
Expand Down
21 changes: 20 additions & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type NATSExporterOptions struct {
GetAccstatz bool
GetLeafz bool
GetJszFilter string
JszSteamMetaKeys string
JszConsumerMetaKeys string
RetryInterval time.Duration
CertFile string
KeyFile string
Expand Down Expand Up @@ -134,6 +136,15 @@ func (ne *NATSExporter) createCollector(system, endpoint string) {
ne.servers))
}

func (ne *NATSExporter) createJszCollector(endpoint string, streamMetaKeys, consumerMetaKeys []string) {
ne.registerCollector(collector.JetStreamSystem, endpoint,
collector.NewJszCollector(endpoint,
ne.opts.Prefix,
ne.servers,
streamMetaKeys,
consumerMetaKeys))
}

func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) {
if err := ne.registry.Register(nc); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
Expand Down Expand Up @@ -218,7 +229,15 @@ func (ne *NATSExporter) InitializeCollectors() error {
default:
return fmt.Errorf("invalid jsz filter %q", opts.GetJszFilter)
}
ne.createCollector(collector.JetStreamSystem, opts.GetJszFilter)
splitOrEmpty := func(s string) []string {
if s == "" {
return []string{}
}
return strings.Split(s, ",")
}
streamMetaKeys := splitOrEmpty(opts.JszSteamMetaKeys)
consumerMetaKeys := splitOrEmpty(opts.JszConsumerMetaKeys)
ne.createJszCollector(opts.GetJszFilter, streamMetaKeys, consumerMetaKeys)
Comment on lines +232 to +240
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be smart to validate that the resulting label names are actually valid in Prometheus. WDYT?

}
if len(ne.Collectors) == 0 {
return fmt.Errorf("no Collectors specified")
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func main() {
flag.BoolVar(&opts.GetSubz, "subz", false, "Get subscription metrics.")
flag.BoolVar(&opts.GetVarz, "varz", false, "Get general metrics.")
flag.StringVar(&opts.GetJszFilter, "jsz", "", "Select JetStream metrics to filter (e.g streams, accounts, consumers)")
flag.StringVar(&opts.JszSteamMetaKeys, "jsz_stream_meta_keys", "",
"Select JetStream stream metadata to output (comma separated)")
flag.StringVar(&opts.JszConsumerMetaKeys, "jsz_consumer_meta_keys", "",
"Select JetStream consumer metadata to output (comma separated)")
flag.StringVar(&opts.CertFile, "tlscert", "", "Server certificate file (Enables HTTPS).")
flag.StringVar(&opts.KeyFile, "tlskey", "", "Private key for server certificate (used with HTTPS).")
flag.StringVar(&opts.CaFile, "tlscacert", "", "Client certificate CA for verification (used with HTTPS).")
Expand Down
2 changes: 1 addition & 1 deletion test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func RunJetStreamServerWithPorts(port, monitorPort int, domain string) (*server.
return nil, err
}

opts.StoreDir = filepath.Dir(tdir)
opts.StoreDir = tdir
Copy link
Author

@jkraml-staffbase jkraml-staffbase Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this was a bug. It always selected the parent directory of the tdir, effectively re-using the same director always.

opts.HTTPHost = "127.0.0.1"
opts.HTTPPort = monitorPort

Expand Down