From dbb24a8329300cf3b8ff63fd713cb6458a270907 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 22 Dec 2025 02:15:02 +0000 Subject: [PATCH] chore(deps): bump github.com/nats-io/nats.go from 1.47.0 to 1.48.0 Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.47.0 to 1.48.0. - [Release notes](https://github.com/nats-io/nats.go/releases) - [Commits](https://github.com/nats-io/nats.go/compare/v1.47.0...v1.48.0) --- updated-dependencies: - dependency-name: github.com/nats-io/nats.go dependency-version: 1.48.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/nats-io/nats.go/README.md | 2 +- vendor/github.com/nats-io/nats.go/context.go | 2 +- vendor/github.com/nats-io/nats.go/enc.go | 4 +- .../nats-io/nats.go/jetstream/README.md | 1 - .../nats-io/nats.go/jetstream/consumer.go | 15 ++++ .../nats-io/nats.go/jetstream/kv.go | 16 +++- .../nats-io/nats.go/jetstream/push.go | 6 +- vendor/github.com/nats-io/nats.go/js.go | 4 +- vendor/github.com/nats-io/nats.go/kv.go | 1 - vendor/github.com/nats-io/nats.go/nats.go | 79 +++++++++++++++++-- vendor/modules.txt | 2 +- 13 files changed, 115 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 1124aa35e..4284fe982 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gosnmp/gosnmp v1.42.1 github.com/jackc/pgx/v5 v5.7.6 github.com/markbates/goth v1.82.0 - github.com/nats-io/nats.go v1.47.0 + github.com/nats-io/nats.go v1.48.0 github.com/rs/zerolog v1.34.0 github.com/shirou/gopsutil/v3 v3.24.5 github.com/spiffe/go-spiffe/v2 v2.6.0 diff --git a/go.sum b/go.sum index 12ec5ce1a..f7008773a 100644 --- a/go.sum +++ b/go.sum @@ -147,8 +147,8 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats-server/v2 v2.12.2 h1:4TEQd0Y4zvcW0IsVxjlXnRso1hBkQl3TS0BI+SxgPhE= github.com/nats-io/nats-server/v2 v2.12.2/go.mod h1:j1AAttYeu7WnvD8HLJ+WWKNMSyxsqmZ160pNtCQRMyE= -github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= -github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index bd74d61fc..9340a8899 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io go get github.com/nats-io/nats.go@latest # To get a specific version: -go get github.com/nats-io/nats.go@v1.47.0 +go get github.com/nats-io/nats.go@v1.48.0 # Note that the latest major version for NATS Server is v2: go get github.com/nats-io/nats-server/v2@latest diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index 382335e83..b3dfa4f06 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -95,7 +95,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat s.AutoUnsubscribe(1) defer s.Unsubscribe() - err = nc.publish(subj, inbox, hdr, data) + err = nc.publish(subj, inbox, false, hdr, data) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nats.go/enc.go b/vendor/github.com/nats-io/nats.go/enc.go index 34a3fae7f..f2297b49a 100644 --- a/vendor/github.com/nats-io/nats.go/enc.go +++ b/vendor/github.com/nats-io/nats.go/enc.go @@ -107,7 +107,7 @@ func (c *EncodedConn) Publish(subject string, v any) error { if err != nil { return err } - return c.Conn.publish(subject, _EMPTY_, nil, b) + return c.Conn.publish(subject, _EMPTY_, false, nil, b) } // PublishRequest will perform a Publish() expecting a response on the @@ -120,7 +120,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error { if err != nil { return err } - return c.Conn.publish(subject, reply, nil, b) + return c.Conn.publish(subject, reply, true, nil, b) } // Request will create an Inbox and perform a Request() call diff --git a/vendor/github.com/nats-io/nats.go/jetstream/README.md b/vendor/github.com/nats-io/nats.go/jetstream/README.md index b3eb261c6..5744b68af 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/README.md +++ b/vendor/github.com/nats-io/nats.go/jetstream/README.md @@ -568,7 +568,6 @@ iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1 request. If the value is set too low, the consumer will stall and not be able to consume messages. - `PullExpiry(time.Duration)` - timeout on a single pull request to the server -type PullThresholdMessages int - `PullThresholdMessages(int)` - amount of messages which triggers refilling the buffer - `PullThresholdBytes(int)` - amount of bytes which triggers refilling the diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go index b7d81aafe..865ba5a9c 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go @@ -67,6 +67,11 @@ type ( // without additional checks. After the channel is closed, // MessageBatch.Error() should be checked to see if there was an error // during message delivery (e.g. missing heartbeat). + // + // NOTE: Fetch has worse performance when used to continuously retrieve + // messages in comparison to Messages or Consume methods, as it does not + // perform any optimizations (e.g. overlapping pull requests) and new + // subscription is created for each execution. Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) // FetchBytes is used to retrieve up to a provided bytes from the @@ -88,6 +93,11 @@ type ( // without additional checks. After the channel is closed, // MessageBatch.Error() should be checked to see if there was an error // during message delivery (e.g. missing heartbeat). + // + // NOTE: FetchBytes has worse performance when used to continuously + // retrieve messages in comparison to Messages or Consume methods, as it + // does not perform any optimizations (e.g. overlapping pull requests) + // and new subscription is created for each execution. FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) // FetchNoWait is used to retrieve up to a provided number of messages @@ -102,6 +112,11 @@ type ( // without additional checks. After the channel is closed, // MessageBatch.Error() should be checked to see if there was an error // during message delivery (e.g. missing heartbeat). + // + // NOTE: FetchNoWait has worse performance when used to continuously + // retrieve messages in comparison to Messages or Consume methods, as it + // does not perform any optimizations (e.g. overlapping pull requests) + // and new subscription is created for each execution. FetchNoWait(batch int) (MessageBatch, error) // Consume will continuously receive messages and handle them diff --git a/vendor/github.com/nats-io/nats.go/jetstream/kv.go b/vendor/github.com/nats-io/nats.go/jetstream/kv.go index 55cd62e03..5e5ce9c97 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/kv.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/kv.go @@ -246,6 +246,11 @@ type ( Mirror *StreamSource `json:"mirror,omitempty"` // Sources defines the configuration for sources of a KeyValue store. + // If no subject transforms are defined, it is assumed that a source is + // also a KV store and subject transforms will be set to correctly map + // keys from the source KV to the current one. If subject transforms are + // defined, they will be used as is. This allows using non-kv streams as + // sources. Sources []*StreamSource `json:"sources,omitempty"` // Compression sets the underlying stream compression. @@ -471,7 +476,6 @@ const ( kvSubjectsTmpl = "$KV.%s.>" kvSubjectsPreTmpl = "$KV.%s." kvSubjectsPreDomainTmpl = "%s.$KV.%s." - kvNoPending = "0" ) const ( @@ -685,8 +689,14 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf scfg.Mirror = m scfg.MirrorDirect = true } else if len(cfg.Sources) > 0 { - // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. for _, ss := range cfg.Sources { + // if subject transforms are already set, then use as is. + // this allows for full control of the source, e.g. using non-KV streams. + // Note that in this case, the Name is not modified and full stream name must be provided. + if len(ss.SubjectTransforms) > 0 { + scfg.Sources = append(scfg.Sources, ss) + continue + } var sourceBucketName string if strings.HasPrefix(ss.Name, kvBucketNamePre) { sourceBucketName = ss.Name[len(kvBucketNamePre):] @@ -1291,6 +1301,8 @@ func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOp return nil, err } sub.SetClosedHandler(func(_ string) { + w.mu.Lock() + defer w.mu.Unlock() close(w.updates) }) // If there were no pending messages at the time of the creation diff --git a/vendor/github.com/nats-io/nats.go/jetstream/push.go b/vendor/github.com/nats-io/nats.go/jetstream/push.go index 97a8b8b78..0ba279d4f 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/push.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/push.go @@ -105,7 +105,11 @@ func (p *pushConsumer) Consume(handler MessageHandler, opts ...PushConsumeOpt) ( } var err error - sub.subscription, err = p.js.conn.Subscribe(p.info.Config.DeliverSubject, internalHandler) + if p.info.Config.DeliverGroup != "" { + sub.subscription, err = p.js.conn.QueueSubscribe(p.info.Config.DeliverSubject, p.info.Config.DeliverGroup, internalHandler) + } else { + sub.subscription, err = p.js.conn.Subscribe(p.info.Config.DeliverSubject, internalHandler) + } if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 90f8df615..def594118 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -1132,7 +1132,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if err != nil { return nil, err } - if err := js.nc.publish(m.Subject, reply, hdr, m.Data); err != nil { + if err := js.nc.publish(m.Subject, reply, false, hdr, m.Data); err != nil { js.clearPAF(id) return nil, err } @@ -3560,7 +3560,7 @@ func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byt } if js.opts.shouldTrace { ctrace := js.opts.ctrace - if ctrace.RequestSent != nil { + if ctrace.ResponseReceived != nil { ctrace.ResponseReceived(subj, resp.Data, resp.Header) } } diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 499049193..dda644ec7 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -354,7 +354,6 @@ const ( kvSubjectsTmpl = "$KV.%s.>" kvSubjectsPreTmpl = "$KV.%s." kvSubjectsPreDomainTmpl = "%s.$KV.%s." - kvNoPending = "0" ) // Regex for valid keys and buckets. diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 90224840e..c56946971 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -48,7 +48,7 @@ import ( // Default Constants const ( - Version = "1.47.0" + Version = "1.48.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -534,6 +534,11 @@ type Options struct { // WebSocketConnectionHeadersHandler is an optional callback handler for generating token used for WebSocket connections. WebSocketConnectionHeadersHandler WebSocketHeadersHandler + + // SkipSubjectValidation will disable publish subject validation. + // NOTE: This is not recommended in general, as the performance gain is minimal + // and may lead to breaking protocol. + SkipSubjectValidation bool } const ( @@ -1512,6 +1517,20 @@ func WebSocketConnectionHeadersHandler(cb WebSocketHeadersHandler) Option { } } +// SkipSubjectValidation is an Option to skip subject validation when +// publishing messages. +// By default, subject validation is performed to ensure that subjects +// are valid according to NATS subject syntax (no spaces newlines and tabs). +// NOTE: It is not recommended to use this option as the performance gain +// is minimal and disabling subject validation can lead breaking protocol +// rules. +func SkipSubjectValidation() Option { + return func(o *Options) error { + o.SkipSubjectValidation = true + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -3916,7 +3935,7 @@ func (nc *Conn) kickFlusher() { // argument is left untouched and needs to be correctly interpreted on // the receiver. func (nc *Conn) Publish(subj string, data []byte) error { - return nc.publish(subj, _EMPTY_, nil, data) + return nc.publish(subj, _EMPTY_, false, nil, data) } // Header represents the optional Header for a NATS message, @@ -4059,27 +4078,71 @@ func (nc *Conn) PublishMsg(m *Msg) error { if err != nil { return err } - return nc.publish(m.Subject, m.Reply, hdr, m.Data) + validateReply := m.Reply != _EMPTY_ + return nc.publish(m.Subject, m.Reply, validateReply, hdr, m.Data) } // PublishRequest will perform a Publish() expecting a response on the // reply subject. Use Request() for automatically waiting for a response // inline. func (nc *Conn) PublishRequest(subj, reply string, data []byte) error { - return nc.publish(subj, reply, nil, data) + return nc.publish(subj, reply, true, nil, data) } // Used for handrolled Itoa const digits = "0123456789" +// validateSubject checks if the subject contains characters that break the NATS protocol. +// Uses an adaptive algorithm: manual loop for short subjects (< 16 chars) and +// SIMD-optimized strings.IndexByte for longer subjects. +func validateSubject(subj string) error { + if subj == "" { + return ErrBadSubject + } + + // Adaptive threshold based on benchmark data showing crossover at ~15-20 characters. + const lengthThreshold = 16 + + if len(subj) < lengthThreshold { + // Fast path for short subjects (< 16 chars) + // Short-circuit on non-control characters. + for i := range len(subj) { + c := subj[i] + if c <= ' ' && (c == ' ' || c == '\t' || c == '\r' || c == '\n') { + return ErrBadSubject + } + } + return nil + } + + // Optimized path for long subjects (>= 16 chars) + // Uses SIMD-optimized strings.IndexByte (processes 16+ bytes per instruction) + if strings.IndexByte(subj, ' ') >= 0 || + strings.IndexByte(subj, '\t') >= 0 || + strings.IndexByte(subj, '\r') >= 0 || + strings.IndexByte(subj, '\n') >= 0 { + return ErrBadSubject + } + return nil +} + // publish is the internal function to publish messages to a nats-server. // Sends a protocol data message by queuing into the bufio writer // and kicking the flush go routine. These writes should be protected. -func (nc *Conn) publish(subj, reply string, hdr, data []byte) error { +func (nc *Conn) publish(subj, reply string, validateReply bool, hdr, data []byte) error { if nc == nil { return ErrInvalidConnection } - if subj == "" { + if !nc.Opts.SkipSubjectValidation { + if err := validateSubject(subj); err != nil { + return err + } + if validateReply { + if err := validateSubject(reply); err != nil { + return ErrBadSubject + } + } + } else if subj == _EMPTY_ { return ErrBadSubject } nc.mu.Lock() @@ -4245,7 +4308,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms } nc.mu.Unlock() - if err := nc.publish(subj, respInbox, hdr, data); err != nil { + if err := nc.publish(subj, respInbox, false, hdr, data); err != nil { return nil, token, err } @@ -4341,7 +4404,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) s.AutoUnsubscribe(1) defer s.Unsubscribe() - err = nc.publish(subj, inbox, hdr, data) + err = nc.publish(subj, inbox, false, hdr, data) if err != nil { return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 382d54836..d3413a1b9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -240,7 +240,7 @@ github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem github.com/nats-io/nats-server/v2/server/thw github.com/nats-io/nats-server/v2/server/tpm -# github.com/nats-io/nats.go v1.47.0 +# github.com/nats-io/nats.go v1.48.0 ## explicit; go 1.23.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin