Skip to content

Commit 8dde763

Browse files
jakubnogaJakub NogaLazzaretti
authored
New feature: NATS bindings (#185)
* feat: add NATS protocol bindings Signed-off-by: Jakub Noga <[email protected]> * chore: run cargo fix & fmt Signed-off-by: Jakub Noga <[email protected]> * fix: issues with docs Signed-off-by: Jakub Noga <[email protected]> * Apply suggestions from code review Co-authored-by: Lazzaretti <[email protected]> Signed-off-by: Jakub Noga <[email protected]> * feat: apply suggestions from code review Signed-off-by: Jakub Noga <[email protected]> * feat: add test for v0.3 deserialization Signed-off-by: Jakub Noga <[email protected]> * chore: run cargo fmt Signed-off-by: Fabrizio Lazzaretti <[email protected]> Co-authored-by: Jakub Noga <[email protected]> Co-authored-by: Lazzaretti <[email protected]>
1 parent 6848cb1 commit 8dde763

File tree

10 files changed

+221
-1
lines changed

10 files changed

+221
-1
lines changed

.github/workflows/rust_tests.yml

+7
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,10 @@ jobs:
146146
toolchain: ${{ matrix.toolchain }}
147147
args: --target ${{ matrix.target }} --manifest-path ./example-projects/poem-example/Cargo.toml
148148

149+
- uses: actions-rs/cargo@v1
150+
name: "Build nats-example"
151+
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
152+
with:
153+
command: build
154+
toolchain: ${{ matrix.toolchain }}
155+
args: --target ${{ matrix.target }} --manifest-path ./example-projects/nats-example/Cargo.toml

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
**/target
22

33
.idea
4+
.vscode
5+
.DS_Store
46
**/Cargo.lock

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ rdkafka = ["rdkafka-lib", "bytes", "futures"]
2424
warp = ["warp-lib", "bytes", "http", "hyper"]
2525
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
2626
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
27+
nats = ["nats-lib"]
2728

2829
[dependencies]
2930
serde = { version = "^1.0", features = ["derive"] }
@@ -50,6 +51,7 @@ hyper = { version = "^0.14", optional = true }
5051
axum-lib = { version = "^0.5", optional = true , package="axum"}
5152
http-body = { version = "^0.4", optional = true }
5253
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
54+
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
5355

5456
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
5557
hostname = "^0.3"

README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Note: This project is WIP under active development, hence all APIs are considere
1515
| JSON Event Format |||
1616
| Kafka Protocol Binding |||
1717
| MQTT Protocol Binding |||
18-
| NATS Protocol Binding | | |
18+
| NATS Protocol Binding | | |
1919
| Web hook |||
2020

2121
## Crate Structure
@@ -30,6 +30,7 @@ enabled by a specific [feature flag]:
3030
* `warp`: Integration with [warp](https://github.com/seanmonstar/warp/).
3131
* `reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
3232
* `rdkafka`: Integration with [rdkafka](https://fede1024.github.io/rust-rdkafka).
33+
* `nats`: Integration with [nats](https://github.com/nats-io/nats.rs)
3334

3435
This crate is continuously tested to work with GNU libc, WASM and musl
3536
toolchains.
@@ -64,6 +65,7 @@ Checkout the examples using our integrations to learn how to send and receive ev
6465
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
6566
* [Kafka Example](example-projects/rdkafka-example)
6667
* [Warp Example](example-projects/warp-example)
68+
* [NATS Example](example-projects/nats-example)
6769

6870
## Development & Contributing
6971

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "nats-example"
3+
version = "0.1.0"
4+
authors = ["Jakub Noga <[email protected]>"]
5+
edition = "2021"
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]
10+
cloudevents-sdk = { path = "../..", features = ["nats"] }
11+
serde_json = "^1.0"
12+
nats = "0.21.0"
+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{error::Error, thread};
2+
3+
use cloudevents::binding::nats::{MessageExt, NatsCloudEvent};
4+
use cloudevents::{Event, EventBuilder, EventBuilderV10};
5+
use serde_json::json;
6+
7+
/// First spin up a nats server i.e.
8+
/// ```bash
9+
/// docker run -p 4222:4222 -ti nats:latest
10+
/// ```
11+
fn main() -> Result<(), Box<dyn Error>> {
12+
let nc = nats::connect("localhost:4222").unwrap();
13+
14+
let event = EventBuilderV10::new()
15+
.id("123".to_string())
16+
.ty("example.test")
17+
.source("http://localhost/")
18+
.data("application/json", json!({"hello": "world"}))
19+
.build()
20+
.unwrap();
21+
22+
let n_msg = NatsCloudEvent::from_event(event).unwrap();
23+
24+
let sub = nc.subscribe("test").unwrap();
25+
26+
let t = thread::spawn(move || -> Result<Event, String> {
27+
match sub.next() {
28+
Some(msg) => match msg.to_event() {
29+
Ok(evt) => Ok(evt),
30+
Err(e) => Err(e.to_string()),
31+
},
32+
None => Err("Unsubed or disconnected".to_string()),
33+
}
34+
});
35+
36+
nc.publish("test", n_msg)?;
37+
38+
let maybe_event = t.join().unwrap();
39+
40+
if let Ok(evt) = maybe_event {
41+
println!("{}", evt.to_string());
42+
} else {
43+
println!("{}", maybe_event.unwrap_err().to_string());
44+
}
45+
46+
Ok(())
47+
}

src/binding/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub mod axum;
1313
feature = "poem"
1414
))]
1515
pub mod http;
16+
#[cfg(feature = "nats")]
17+
pub mod nats;
1618
#[cfg(feature = "poem")]
1719
pub mod poem;
1820
#[cfg(feature = "rdkafka")]

src/binding/nats/deserializer.rs

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use crate::{
2+
message::{Result, StructuredDeserializer},
3+
Event,
4+
};
5+
6+
use nats_lib as nats;
7+
8+
impl StructuredDeserializer for nats::Message {
9+
fn deserialize_structured<R: Sized, V: crate::message::StructuredSerializer<R>>(
10+
self,
11+
serializer: V,
12+
) -> crate::message::Result<R> {
13+
serializer.set_structured_event(self.data.to_vec())
14+
}
15+
}
16+
17+
/// Trait implemented by [`nats::Message`] to enable convenient deserialization to [`Event`]
18+
///
19+
/// Trait sealed <https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed>
20+
pub trait MessageExt: private::Sealed {
21+
fn to_event(&self) -> Result<Event>;
22+
}
23+
24+
impl MessageExt for nats::Message {
25+
fn to_event(&self) -> Result<Event> {
26+
StructuredDeserializer::into_event(self.to_owned())
27+
}
28+
}
29+
30+
mod private {
31+
use nats_lib as nats;
32+
33+
// Sealing the MessageExt
34+
pub trait Sealed {}
35+
impl Sealed for nats::Message {}
36+
}
37+
38+
#[cfg(test)]
39+
mod tests {
40+
use crate::test::fixtures;
41+
use nats_lib as nats;
42+
use serde_json::json;
43+
44+
use super::*;
45+
46+
#[test]
47+
fn test_structured_deserialize_v10() {
48+
let expected = fixtures::v10::full_json_data_string_extension();
49+
50+
let nats_message = nats::Message::new(
51+
"not_relevant",
52+
None,
53+
json!(expected.clone()).to_string().as_bytes().to_vec(),
54+
None,
55+
);
56+
57+
let actual = nats_message.to_event().unwrap();
58+
59+
assert_eq!(expected, actual)
60+
}
61+
62+
#[test]
63+
fn test_structured_deserialize_v03() {
64+
let expected = fixtures::v03::full_json_data();
65+
66+
let nats_message = nats::Message::new(
67+
"not_relevant",
68+
None,
69+
json!(expected.clone()).to_string().as_bytes().to_vec(),
70+
None,
71+
);
72+
73+
let actual = nats_message.to_event().unwrap();
74+
75+
assert_eq!(expected, actual)
76+
}
77+
}

src/binding/nats/mod.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats)
2+
//! ## Examples
3+
//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
4+
//! ```
5+
//! use nats_lib as nats;
6+
//! use cloudevents::binding::nats::MessageExt;
7+
//!
8+
//! fn consume() {
9+
//! let nc = nats::connect("localhost:4222").unwrap();
10+
//! let sub = nc.subscribe("test").unwrap();
11+
//! let nats_message = sub.next().unwrap();
12+
//! let cloud_event = nats_message.to_event().unwrap();
13+
//!
14+
//! println!("{}", cloud_event.to_string());
15+
//! }
16+
//! ```
17+
//!
18+
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
19+
//! ```
20+
//! use nats_lib as nats;
21+
//! use cloudevents::binding::nats::NatsCloudEvent;
22+
//! use cloudevents::{EventBuilder, EventBuilderV10, Event};
23+
//! use serde_json::json;
24+
//!
25+
//! fn publish() {
26+
//! let nc = nats::connect("localhost:4222").unwrap();
27+
//!
28+
//! let event = EventBuilderV10::new()
29+
//! .id("123".to_string())
30+
//! .ty("example.test")
31+
//! .source("http://localhost/")
32+
//! .data("application/json", json!({"hello": "world"}))
33+
//! .build()
34+
//! .unwrap();
35+
//!
36+
//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap();
37+
//! }
38+
//! ```
39+
mod deserializer;
40+
mod serializer;
41+
42+
pub use deserializer::MessageExt;
43+
pub use serializer::NatsCloudEvent;

src/binding/nats/serializer.rs

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use crate::{
2+
message::{Error, Result},
3+
Event,
4+
};
5+
6+
/// Helper struct containing text data bytes of JSON serialized [Event]
7+
///
8+
/// Implements [`AsRef`] so it can be directly passed to [`nats::Connection`](https://docs.rs/nats/0.21.0/nats/struct.Connection.html) methods as payload.
9+
pub struct NatsCloudEvent {
10+
pub payload: Vec<u8>,
11+
}
12+
13+
impl AsRef<[u8]> for NatsCloudEvent {
14+
fn as_ref(&self) -> &[u8] {
15+
&self.payload.as_ref()
16+
}
17+
}
18+
19+
impl NatsCloudEvent {
20+
pub fn from_event(event: Event) -> Result<Self> {
21+
match serde_json::to_vec(&event) {
22+
Ok(payload) => Ok(Self { payload }),
23+
Err(e) => Err(Error::SerdeJsonError { source: e }),
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)