Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for NATS KV Processor/Input/Output. Add default NATS urls #1704

Merged
merged 29 commits into from
Feb 24, 2023
Merged

Add support for NATS KV Processor/Input/Output. Add default NATS urls #1704

merged 29 commits into from
Feb 24, 2023

Conversation

codegangsta
Copy link
Contributor

@codegangsta codegangsta commented Jan 28, 2023

This PR goes out to all the blobs out there who believed there would come a day where Benthos would have more NATS support:

  • 🎉 NATS KV Input - it watches for updates on keys, isn't that neat?!
  • 🎉 NATS KV Output - It puts your keys and values into a bucket!
  • 🎉 NATS KV Processor - This thing puts, gets, deletes, creates, updates, deletes, slices, dices and even folds your laundry (if you are so inclined)

We got docs! 😮

We got integration tests! 😄

We even got default urls for NATS components! 🤷 (for real though, We like using the default URL in all our NATS tool, since NATS always kicks itself off on localhost:4222, I think this makes it a wee bit easier to get started with NATS and Benthos)

Go ahead, tear it apart.... I dare you.

Bring it el jeffe! (aka @Jeffail )

And if you are really nice I'll write a Cache next

@codegangsta codegangsta requested a review from Jeffail as a code owner January 28, 2023 02:22
@codegangsta
Copy link
Contributor Author

FYI on failing builds

goreleaser/goreleaser-action#389

@josephwoodward
Copy link
Contributor

Great work! Would it be worth upgrading to the latest version of the NATS client as part of this pull request? I see Benthos is currently using a version released in May 2022.

metaKVBucket, keyValue.Bucket(),
metaKVKey, key,
metaKVRevision, rev,
).Debug("Updated kv bucket entry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor one but I see in other log entries "kv" is uppercase. Perhaps it should be capitalised here too for consistency.

@codegangsta
Copy link
Contributor Author

Great work! Would it be worth upgrading to the latest version of the NATS client as part of this pull request? I see Benthos is currently using a version released in May 2022.

Oh yeah. Great reminder. I'll go ahead and upgrade the client

@Jeffail
Copy link
Collaborator

Jeffail commented Feb 5, 2023

Bring it el jeffe!

Thems is fighting words, I'm gonna review this one on stream 😎

@iluminae
Copy link
Contributor

And if you are really nice I'll write a Cache next

Love it, but I would maybe suggest cache would be a better first addition, as there are output and processor implementations around cache resources that are composable, which is pretty awesome.

iirc there is not a pattern to input from a cache so that use case would have to be covered by a new input type, but it seems like the processor and output may be better to utilize what is there.

Just my 2 cents!

Copy link
Collaborator

@Jeffail Jeffail left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn you, just some nits, well played 😭 😞

The comments are just quick things so I'm happy to do them after I merge, will do it tomorrow.

@@ -33,6 +33,7 @@ You can access these metadata fields using [function interpolation](/docs/config
` + auth.Description()).
Field(service.NewStringListField("urls").
Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.").
Default([]string{"nats://127.0.0.1:4222"}).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I avoid having defaults for stuff like urls and addresses, we used to have them all over the shop and people kept getting bitten when forgetting to specify them explicitly.

return nil, nil, service.ErrNotConnected
}

if entry == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this could happen if some internal timeout occurs? Just making sure we can't enter a busy loop here.

# We need to make this key random as the NATS server will only deliver the
# latest revision of a key when it's requested by a watcher, this is by
# design, but if we want to test benthos semantics like batching we should
# use unique keys for ever message passing through the output
Copy link
Collaborator

@Jeffail Jeffail Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ERRRRRMMMMMMMM... I think you mean every message...

What a disaster 😓

internal/impl/nats/processor_kv.go Outdated Show resolved Hide resolved
@Jeffail
Copy link
Collaborator

Jeffail commented Feb 24, 2023

@Jeffail Jeffail merged commit caa8363 into redpanda-data:main Feb 24, 2023
@codegangsta
Copy link
Contributor Author

Bummed I missed the stream @Jeffail! Thanks for taking a look at this!

@iluminae I'll probably find time to add KV as a cache pretty soon

@marshauf marshauf mentioned this pull request Dec 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants