diff --git a/Cargo.toml b/Cargo.toml index 572646ca..da7e535d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,9 @@ tracing-context = ["dep:tracing", "dep:tracing-opentelemetry", "dep:opentelemetr graphql = ["es-entity-macros/graphql", "dep:async-graphql", "dep:base64"] event-context = ["es-entity-macros/event-context", "event-context-enabled"] event-context-enabled = ["es-entity-macros/event-context-enabled"] +idempotency-key = ["es-entity-macros/idempotency-key", "event-context-enabled"] json-schema = ["dep:schemars"] -mdbook-test = ["dep:anyhow"] +mdbook-test = ["dep:anyhow", "idempotency-key"] instrument = ["es-entity-macros/instrument", "dep:tracing"] [dependencies] diff --git a/book/src/entity-type.md b/book/src/entity-type.md index 8e47c5fd..99b36fc0 100644 --- a/book/src/entity-type.md +++ b/book/src/entity-type.md @@ -58,7 +58,7 @@ impl User { // It is not mandatory but very useful in the context of distributed / multi-thread // systems to protect against replays. idempotency_guard!( - self.events.iter_all().rev(), + self.events.iter_persisted().rev(), // If this pattern matches return Idempotent::AlreadyApplied UserEvent::NameUpdated { name } if name == &new_name, // Stop searching here diff --git a/book/src/idempotency.md b/book/src/idempotency.md index 8d6c0b6b..32069041 100644 --- a/book/src/idempotency.md +++ b/book/src/idempotency.md @@ -1,9 +1,9 @@ # Idempotency Idempotency means that performing the same operation multiple times has the same effect as doing it once. -It’s used to ensure that retrying a request doesn’t cause unintended side effects, such as duplicated `Event`s being persisted. +It's used to ensure that retrying a request doesn't cause unintended side effects, such as duplicated `Event`s being persisted. -It is particularly useful in the context of a distributed system where operations could be triggered from an asynchronous event queue (ie pub-sub). +This is particularly important in distributed systems where operations could be triggered from an asynchronous event queue (ie pub-sub). Whenever you would like to have an `exactly-once` processing guarantee - you can easily achieve an `effectively-once` processing by ensuring your mutations are all idempotent. Making your `Entity` mutations idempotent is very simple when doing Event Sourcing as you can easily check if the event you are about to append already exists in the history. @@ -136,58 +136,116 @@ fn main() { } ``` -To cut down on boilerplate this pattern of iterating the events to check if an event was already applied has been encoded into the `idempotency_guard!` macro: +To cut down on boilerplate this pattern of iterating the events to check if an event was already applied has been encoded into the `idempotency_guard!` macro. + +The macro expects an iterator over `&PersistedEvent` items, which you get from `EntityEvents::iter_persisted()`: ```rust # extern crate es_entity; +# extern crate serde; +# extern crate derive_builder; +# extern crate sqlx; +# use serde::{Deserialize, Serialize}; +# use derive_builder::Builder; +use es_entity::{idempotency_guard, Idempotent, *}; +# es_entity::entity_id! { UserId } +# +# #[derive(EsEvent, Debug, Serialize, Deserialize)] +# #[serde(tag = "type", rename_all = "snake_case")] +# #[es_event(id = "UserId")] # pub enum UserEvent { -# Initialized { id: u64, name: String }, +# Initialized { id: UserId, name: String }, # NameUpdated { name: String }, # } +# +# pub struct NewUser { id: UserId, name: String } +# impl IntoEvents for NewUser { +# fn into_events(self) -> EntityEvents { +# EntityEvents::init(self.id, [UserEvent::Initialized { id: self.id, name: self.name }]) +# } +# } +# +# #[derive(EsEntity, Builder)] +# #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] # pub struct User { -# events: Vec +# pub id: UserId, +# pub name: String, +# events: EntityEvents, # } -use es_entity::{idempotency_guard, Idempotent}; impl User { - pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()>{ + pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()> { let name = new_name.into(); idempotency_guard!( - // The iterator of events - self.events.iter().rev(), - // The pattern match to check whether an operation was already applied - UserEvent::NameUpdated { name: existing_name } if existing_name == &name + // Iterator over persisted events (reversed for most-recent-first) + self.events.iter_persisted().rev(), + // Pattern match to check whether operation was already applied + UserEvent::NameUpdated { name: existing_name } if existing_name == &name ); self.events.push(UserEvent::NameUpdated { name }); Idempotent::Executed(()) } } - -fn main() { - let mut user = User { events: vec![] }; - assert!(user.update_name("Harrison").did_execute()); - assert!(user.update_name("Harrison").was_already_applied()); -} +# +# impl TryFromEvents for User { +# fn try_from_events(events: EntityEvents) -> Result { +# let mut builder = UserBuilder::default(); +# for event in events.iter_all() { +# match event { +# UserEvent::Initialized { id, name } => { +# builder = builder.id(*id).name(name.clone()); +# } +# UserEvent::NameUpdated { name } => { +# builder = builder.name(name.clone()); +# } +# } +# } +# builder.events(events).build() +# } +# } ``` Finally there is the case where an operation was applied in the past - but it is still legal to re-apply it. Like changing a name back to what it originally was: + ```rust # extern crate es_entity; +# extern crate serde; +# extern crate derive_builder; +# extern crate sqlx; +# use serde::{Deserialize, Serialize}; +# use derive_builder::Builder; +use es_entity::{idempotency_guard, Idempotent, *}; +# es_entity::entity_id! { UserId } +# +# #[derive(EsEvent, Debug, Serialize, Deserialize)] +# #[serde(tag = "type", rename_all = "snake_case")] +# #[es_event(id = "UserId")] # pub enum UserEvent { -# Initialized { id: u64, name: String }, +# Initialized { id: UserId, name: String }, # NameUpdated { name: String }, # } +# +# pub struct NewUser { id: UserId, name: String } +# impl IntoEvents for NewUser { +# fn into_events(self) -> EntityEvents { +# EntityEvents::init(self.id, [UserEvent::Initialized { id: self.id, name: self.name }]) +# } +# } +# +# #[derive(EsEntity, Builder)] +# #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] # pub struct User { -# events: Vec +# pub id: UserId, +# pub name: String, +# events: EntityEvents, # } -use es_entity::{idempotency_guard, Idempotent}; impl User { - pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()>{ + pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()> { let name = new_name.into(); idempotency_guard!( - self.events.iter().rev(), + self.events.iter_persisted().rev(), UserEvent::NameUpdated { name: existing_name } if existing_name == &name, // The `=>` signifies the pattern where to stop the iteration. => UserEvent::NameUpdated { .. } @@ -196,13 +254,198 @@ impl User { Idempotent::Executed(()) } } +# +# impl TryFromEvents for User { +# fn try_from_events(events: EntityEvents) -> Result { +# let mut builder = UserBuilder::default(); +# for event in events.iter_all() { +# match event { +# UserEvent::Initialized { id, name } => { +# builder = builder.id(*id).name(name.clone()); +# } +# UserEvent::NameUpdated { name } => { +# builder = builder.name(name.clone()); +# } +# } +# } +# builder.events(events).build() +# } +# } +``` + +Without the `=>` argument, updating a name back to a previous value would be rejected as `AlreadyApplied`. + +## Idempotency Keys + +Sometimes pattern matching against event data isn't sufficient for idempotency checks. + +Consider an accounting system where a user withdraws $100. If the network times out and the client retries, you receive two withdrawal requests for $100. Was the second request a retry of the first (and should be ignored), or a legitimate new withdrawal (and should be processed)? Pattern matching on the amount alone can't distinguish between these cases—you need an external identifier to detect the duplicate. + +The `idempotency-key` feature extends the `idempotency_guard!` macro to also check for matching idempotency keys stored in event contexts. + +### Enabling the Feature + +Add the feature to your `Cargo.toml`: + +```toml +[dependencies] +es-entity = { version = "...", features = ["idempotency-key"] } +``` + +Note: This feature automatically enables `event-context-enabled`, which stores context data with each event. + +### Setting an Idempotency Key + +Before performing a mutation, set an idempotency key in the current event context: + +```rust +# extern crate es_entity; +use es_entity::EventContext; fn main() { - let mut user = User { events: vec![] }; - assert!(user.update_name("Harrison").did_execute()); - assert!(user.update_name("Colin").did_execute()); - assert!(user.update_name("Harrison").did_execute()); + let mut ctx = EventContext::current(); + ctx.set_idempotency_key("request-12345"); } ``` -Without the `=>` argument the second call of `assert!(user.update_name("Harrison").did_execute());` would fail. +The idempotency key will be stored in the context of any events created while this context is active. + +### Using with idempotency_guard! + +When the `idempotency-key` feature is enabled, the `idempotency_guard!` macro checks both: +1. **Idempotency key matches** - If the current context has an idempotency key set, it checks if any persisted event has a matching key in its context +2. **Pattern matches** - The existing pattern matching behavior + +```rust +# extern crate es_entity; +# extern crate serde; +# extern crate derive_builder; +# extern crate sqlx; +# use serde::{Deserialize, Serialize}; +# use derive_builder::Builder; +use es_entity::{idempotency_guard, Idempotent, EventContext, *}; +# es_entity::entity_id! { OrderId, PaymentId } +# type Money = f64; +# +# #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] +# #[serde(tag = "type", rename_all = "snake_case")] +# #[es_event(id = "OrderId")] +# pub enum OrderEvent { +# Initialized { id: OrderId }, +# PaymentApplied { payment_id: PaymentId, amount: Money }, +# } +# +# pub struct NewOrder { id: OrderId } +# impl IntoEvents for NewOrder { +# fn into_events(self) -> EntityEvents { +# EntityEvents::init(self.id, [OrderEvent::Initialized { id: self.id }]) +# } +# } +# +# #[derive(EsEntity, Builder)] +# #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] +# pub struct Order { +# pub id: OrderId, +# events: EntityEvents, +# } + +impl Order { + pub fn apply_payment(&mut self, payment_id: PaymentId, amount: Money) -> Idempotent<()> { + // Set idempotency key from external request ID + EventContext::current().set_idempotency_key(format!("payment-{}", payment_id)); + + // Guard checks BOTH: + // 1. Any persisted event with same idempotency key? + // 2. Pattern match for same payment_id? + idempotency_guard!( + self.events.iter_persisted().rev(), + OrderEvent::PaymentApplied { payment_id: pid, .. } if pid == &payment_id + ); + + self.events.push(OrderEvent::PaymentApplied { payment_id, amount }); + Idempotent::Executed(()) + } +} +# +# impl TryFromEvents for Order { +# fn try_from_events(events: EntityEvents) -> Result { +# let mut builder = OrderBuilder::default(); +# for event in events.iter_all() { +# match event { +# OrderEvent::Initialized { id } => { +# builder = builder.id(*id); +# } +# OrderEvent::PaymentApplied { .. } => {} +# } +# } +# builder.events(events).build() +# } +# } +``` + +### Break Pattern Behavior + +When using the break pattern (`=>`) with the `idempotency-key` feature, the macro continues scanning all events for idempotency key matches even after the break pattern matches. This ensures that duplicate requests are always detected regardless of where they appear in the event history: + +```rust +# extern crate es_entity; +# extern crate serde; +# extern crate derive_builder; +# extern crate sqlx; +# use serde::{Deserialize, Serialize}; +# use derive_builder::Builder; +# use es_entity::{idempotency_guard, Idempotent, EventContext, *}; +# es_entity::entity_id! { OrderId, PaymentId } +# type Money = f64; +# +# #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] +# #[serde(tag = "type", rename_all = "snake_case")] +# #[es_event(id = "OrderId")] +# pub enum OrderEvent { +# Initialized { id: OrderId }, +# PaymentApplied { payment_id: PaymentId, amount: Money }, +# } +# +# pub struct NewOrder { id: OrderId } +# impl IntoEvents for NewOrder { +# fn into_events(self) -> EntityEvents { +# EntityEvents::init(self.id, [OrderEvent::Initialized { id: self.id }]) +# } +# } +# +# #[derive(EsEntity, Builder)] +# #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] +# pub struct Order { +# pub id: OrderId, +# events: EntityEvents, +# } + +impl Order { + pub fn apply_payment(&mut self, payment_id: PaymentId, amount: Money) -> Idempotent<()> { + EventContext::current().set_idempotency_key(format!("payment-{}", payment_id)); + idempotency_guard!( + self.events.iter_persisted().rev(), + OrderEvent::PaymentApplied { payment_id: pid, .. } if pid == &payment_id, + // Break pattern stops pattern matching but idempotency key checking continues + => OrderEvent::PaymentApplied { .. } + ); + self.events.push(OrderEvent::PaymentApplied { payment_id, amount }); + Idempotent::Executed(()) + } +} +# +# impl TryFromEvents for Order { +# fn try_from_events(events: EntityEvents) -> Result { +# let mut builder = OrderBuilder::default(); +# for event in events.iter_all() { +# match event { +# OrderEvent::Initialized { id } => { +# builder = builder.id(*id); +# } +# OrderEvent::PaymentApplied { .. } => {} +# } +# } +# builder.events(events).build() +# } +# } +``` diff --git a/book/src/nesting.md b/book/src/nesting.md index 8604eb4d..631ff522 100644 --- a/book/src/nesting.md +++ b/book/src/nesting.md @@ -99,9 +99,9 @@ pub struct LineItem { impl BillingPeriod { pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent { idempotency_guard!( - self.events.iter_all().rev(), + self.events.iter_persisted().rev(), BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } - if a == &amount && d == &description + if *a == amount && d == &description ); self.line_items.push(LineItem { @@ -119,7 +119,7 @@ impl BillingPeriod { pub fn close(&mut self) -> Idempotent<()> { idempotency_guard!( - self.events.iter_all().rev(), + self.events.iter_persisted().rev(), BillingPeriodEvent::Closed ); @@ -232,9 +232,9 @@ Now let's implement the `Subscription` entity that will contain the nested `Bill # impl BillingPeriod { # pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent { # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } -# if a == &amount && d == &description +# if *a == amount && d == &description # ); # # self.line_items.push(LineItem { @@ -252,7 +252,7 @@ Now let's implement the `Subscription` entity that will contain the nested `Bill # # pub fn close(&mut self) -> Idempotent<()> { # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::Closed # ); # @@ -483,9 +483,9 @@ This leverages the rust module system to enforce that the children cannot be acc # impl BillingPeriod { # pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent { # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } -# if a == &amount && d == &description +# if *a == amount && d == &description # ); # # self.line_items.push(LineItem { @@ -503,7 +503,7 @@ This leverages the rust module system to enforce that the children cannot be acc # # pub fn close(&mut self) -> Idempotent<()> { # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::Closed # ); # @@ -704,9 +704,9 @@ Now we can use our aggregate with full type safety and automatic loading of nest # unreachable!() # } # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } -# if a == &amount && d == &description +# if *a == amount && d == &description # ); # self.line_items.push(LineItem { # amount, @@ -720,7 +720,7 @@ Now we can use our aggregate with full type safety and automatic loading of nest # } # pub fn close(&mut self) -> Idempotent<()> { # idempotency_guard!( -# self.events.iter_all().rev(), +# self.events.iter_persisted().rev(), # BillingPeriodEvent::Closed # ); # self.is_current = false; diff --git a/book/src/quickstart.md b/book/src/quickstart.md index b86d8e7c..2eb7beb3 100644 --- a/book/src/quickstart.md +++ b/book/src/quickstart.md @@ -110,7 +110,7 @@ impl User { // It is not mandatory but very useful in the context of distributed / multi-thread // systems to protect against replays. idempotency_guard!( - self.events.iter_all().rev(), + self.events.iter_persisted().rev(), // If this pattern matches return Idempotent::AlreadyApplied UserEvent::NameUpdated { name } if name == &new_name, // Stop searching here diff --git a/book/src/repo-delete.md b/book/src/repo-delete.md index 7d9735e3..6de52a60 100644 --- a/book/src/repo-delete.md +++ b/book/src/repo-delete.md @@ -69,7 +69,7 @@ pub struct User { impl User { fn delete(&mut self) -> Idempotent<()> { idempotency_guard!( - self.events.iter_all(), + self.events.iter_persisted(), UserEvent::Deleted ); self.events.push(UserEvent::Deleted); diff --git a/es-entity-macros/Cargo.toml b/es-entity-macros/Cargo.toml index 13fae61e..61c1c314 100644 --- a/es-entity-macros/Cargo.toml +++ b/es-entity-macros/Cargo.toml @@ -13,6 +13,7 @@ fail-on-warnings = [] graphql = [] event-context = ["event-context-enabled"] event-context-enabled = [] +idempotency-key = [] instrument = [] [lib] diff --git a/flake.lock b/flake.lock index 1da3cb0e..12b6ea3e 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1768305791, - "narHash": "sha256-AIdl6WAn9aymeaH/NvBj0H9qM+XuAuYbGMZaP0zcXAQ=", + "lastModified": 1770197578, + "narHash": "sha256-AYqlWrX09+HvGs8zM6ebZ1pwUqjkfpnv8mewYwAo+iM=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "1412caf7bf9e660f2f962917c14b1ea1c3bc695e", + "rev": "00c21e4c93d963c50d4c0c89bfa84ed6e0694df2", "type": "github" }, "original": { @@ -48,11 +48,11 @@ ] }, "locked": { - "lastModified": 1768531678, - "narHash": "sha256-tf4xEp5Zq8+Zce0WtU8b0VNMxhQtwes67sN2phnbkpk=", + "lastModified": 1770347142, + "narHash": "sha256-uz+ZSqXpXEPtdRPYwvgsum/CfNq7AUQ/0gZHqTigiPM=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "0a9de73f3c23206a2fce3c7656a42d3a3f07be9f", + "rev": "2859683cd9ef7858d324c5399b0d8d6652bf4044", "type": "github" }, "original": { diff --git a/src/context/mod.rs b/src/context/mod.rs index 38f0f7b7..7662d13d 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -79,6 +79,10 @@ use std::{borrow::Cow, cell::RefCell, rc::Rc}; pub use tracing::*; pub use with_event_context::*; +/// The key used to store the idempotency key in event context. +#[cfg(feature = "idempotency-key")] +pub const IDEMPOTENCY_KEY: &str = "idempotency_key"; + /// Immutable context data that can be safely shared across thread boundaries. /// /// This struct holds key-value pairs of context information that gets attached @@ -120,6 +124,12 @@ impl ContextData { }; serde_json::from_value(val.clone()).map(Some) } + + /// Returns the idempotency key if present in the context data. + #[cfg(feature = "idempotency-key")] + pub fn idempotency_key(&self) -> Option<&str> { + self.0.get(IDEMPOTENCY_KEY).and_then(|v| v.as_str()) + } } struct StackEntry { @@ -365,6 +375,31 @@ impl EventContext { } data } + + /// Sets the idempotency key in the current context. + /// + /// The idempotency key is used by the `idempotency_guard!` macro to detect + /// duplicate operations by matching against event contexts in the event history. + /// + /// # Examples + /// + /// ```rust + /// use es_entity::context::EventContext; + /// + /// let mut ctx = EventContext::current(); + /// ctx.set_idempotency_key("request-12345"); + /// ``` + #[cfg(feature = "idempotency-key")] + pub fn set_idempotency_key(&mut self, key: impl Into) { + self.insert(IDEMPOTENCY_KEY, &key.into()) + .expect("Failed to serialize idempotency key"); + } + + /// Returns the idempotency key from the current context, if set. + #[cfg(feature = "idempotency-key")] + pub fn idempotency_key(&self) -> Option { + self.data().idempotency_key().map(|s| s.to_string()) + } } #[cfg(test)] @@ -560,4 +595,52 @@ mod tests { assert_eq!(current_json(), serde_json::json!({ "parent": "context" })); } + + #[cfg(feature = "idempotency-key")] + #[test] + fn set_and_get_idempotency_key() { + let mut ctx = EventContext::current(); + assert!(ctx.idempotency_key().is_none()); + + ctx.set_idempotency_key("test-key-123"); + assert_eq!(ctx.idempotency_key(), Some("test-key-123".to_string())); + + // Verify it's in the context data + let data = ctx.data(); + assert_eq!(data.idempotency_key(), Some("test-key-123")); + } + + #[cfg(feature = "idempotency-key")] + #[test] + fn idempotency_key_in_forked_context() { + let mut parent = EventContext::current(); + parent.set_idempotency_key("parent-key"); + + { + let mut child = EventContext::fork(); + // Child inherits parent's idempotency key + assert_eq!(child.idempotency_key(), Some("parent-key".to_string())); + + // Child can override the key + child.set_idempotency_key("child-key"); + assert_eq!(child.idempotency_key(), Some("child-key".to_string())); + } + + // Parent's key is unchanged + assert_eq!(parent.idempotency_key(), Some("parent-key".to_string())); + } + + #[cfg(feature = "idempotency-key")] + #[test] + fn context_data_idempotency_key() { + let mut ctx = EventContext::current(); + ctx.set_idempotency_key("data-key"); + + let data = ctx.data(); + assert_eq!(data.idempotency_key(), Some("data-key")); + + // Verify we can look it up via the constant + let key: Option = data.lookup(IDEMPOTENCY_KEY).unwrap(); + assert_eq!(key, Some("data-key".to_string())); + } } diff --git a/src/idempotent.rs b/src/idempotent.rs index 2553982b..2d6806b3 100644 --- a/src/idempotent.rs +++ b/src/idempotent.rs @@ -6,43 +6,7 @@ /// ignored due to idempotency checks. /// The [`idempotency_guard`][crate::idempotency_guard] macro provides an easy way to do such checks. /// -/// # Examples -/// -/// ```rust -/// use es_entity::{idempotency_guard, Idempotent}; -/// pub enum UserEvent{ -/// Initialized {id: u64, name: String}, -/// NameUpdated {name: String} -/// } -/// -/// pub struct User{ -/// events: Vec -/// } -/// -/// impl User{ -/// // This returns `Idempotent` where T is the return value we get after the event is processed -/// pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()>{ -/// let name = new_name.into(); -/// idempotency_guard!( -/// self.events.iter().rev(), -/// UserEvent::NameUpdated { name: existing_name } if existing_name == &name -/// ); -/// self.events.push(UserEvent::NameUpdated{name}); -/// Idempotent::Executed(()) -/// } -/// } -/// -/// fn example(){ -/// let mut user = User{ events: vec![] }; -/// assert!(user.update_name("Alice").did_execute()); -/// // updating "Alice" executes as no such event has been processed before. -/// // Signalled by returning `Idempotent::Executed(T)`, validated with `did_execute` helper method -/// -/// assert!(user.update_name("Alice").was_already_applied()); -/// // updating "Alice" again ignored because same event has been processed before. -/// // Signalled by returning `Idempotent::AlreadyApplied` early, validated with `was_already_applied` helper method -/// } -/// ``` +/// See the [`idempotency_guard`][crate::idempotency_guard] macro for usage examples. #[must_use] pub enum Idempotent { // Signals if executed and returns T diff --git a/src/macros.rs b/src/macros.rs index 9340d086..130cbd09 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -6,64 +6,78 @@ /// /// # Parameters /// -/// - `$events`: Event collection to search (usually chronologically reversed) +/// - `$events`: Iterator over `&PersistedEvent` items (use `iter_persisted().rev()`) /// - `$pattern`: Event patterns that indicate operation already applied /// - `$break_pattern`: Optional break pattern to stop searching /// +/// # Feature: idempotency-key +/// +/// When the `idempotency-key` feature is enabled, the macro also checks for matching +/// idempotency keys stored in event contexts. If an idempotency key is set in the +/// current context (via `EventContext::set_idempotency_key()`), the macro will return +/// `AlreadyApplied` if any persisted event has a matching idempotency key. +/// /// # Examples /// /// ```rust -/// use es_entity::{idempotency_guard, Idempotent}; -/// pub enum UserEvent{ -/// Initialized {id: u64, name: String}, -/// NameUpdated {name: String} +/// use es_entity::{idempotency_guard, Idempotent, EntityEvents, EsEvent}; +/// use serde::{Serialize, Deserialize}; +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] +/// pub enum UserEvent { +/// Initialized { name: String }, +/// NameUpdated { name: String }, /// } /// -/// pub struct User{ -/// events: Vec +/// impl EsEvent for UserEvent { +/// type EntityId = uuid::Uuid; +/// fn event_context() -> bool { false } /// } /// -/// impl User{ -/// pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()>{ +/// pub struct User { +/// name: String, +/// events: EntityEvents, +/// } +/// +/// impl User { +/// pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()> { /// let name = new_name.into(); /// idempotency_guard!( -/// self.events.iter().rev(), +/// self.events.iter_persisted().rev(), /// UserEvent::NameUpdated { name: existing_name } if existing_name == &name -/// // above line returns early if same name found /// ); -/// self.events.push(UserEvent::NameUpdated{name}); +/// self.events.push(UserEvent::NameUpdated { name }); /// Idempotent::Executed(()) /// } -/// -/// pub fn update_name_with_break(&mut self, new_name: impl Into) -> Idempotent<()>{ +/// +/// pub fn update_name_with_break(&mut self, new_name: impl Into) -> Idempotent<()> { /// let name = new_name.into(); /// idempotency_guard!( -/// self.events.iter().rev(), +/// self.events.iter_persisted().rev(), /// UserEvent::NameUpdated { name: existing_name } if existing_name == &name, -/// => UserEvent::NameUpdated {..} -/// // above line breaks iteration if same event found -/// ); -/// self.events.push(UserEvent::NameUpdated{name}); -/// Idempotent::Executed(()) +/// // The `=>` signifies the pattern where to stop the iteration +/// => UserEvent::NameUpdated { .. } +/// ); +/// self.events.push(UserEvent::NameUpdated { name }); +/// Idempotent::Executed(()) /// } /// } -/// -/// let mut user1 = User{ events: vec![] }; -/// let mut user2 = User{ events: vec![] }; -/// assert!(user1.update_name("Alice").did_execute()); -/// // updating "ALice" again ignored because same event with same name exists -/// assert!(user1.update_name("Alice").was_already_applied()); -/// -/// assert!(user2.update_name_with_break("Alice").did_execute()); -/// assert!(user2.update_name_with_break("Bob").did_execute()); -/// // updating "ALice" again works because of early break condition -/// assert!(user2.update_name_with_break("Alice").did_execute()); /// ``` #[macro_export] macro_rules! idempotency_guard { ($events:expr, $( $pattern:pat $(if $guard:expr)? ),+ $(,)?) => { - for event in $events { - match event { + #[cfg(feature = "idempotency-key")] + let __current_idempotency_key = $crate::EventContext::current().idempotency_key(); + for __persisted_event in $events { + #[cfg(feature = "idempotency-key")] + if let Some(ref __key) = __current_idempotency_key { + if let Some(ref __ctx) = __persisted_event.context { + if __ctx.idempotency_key() == Some(__key.as_str()) { + return $crate::FromAlreadyApplied::from_already_applied(); + } + } + } + match &__persisted_event.event { $( $pattern $(if $guard)? => return $crate::FromAlreadyApplied::from_already_applied(), )+ @@ -73,8 +87,29 @@ macro_rules! idempotency_guard { }; ($events:expr, $( $pattern:pat $(if $guard:expr)? ),+, => $break_pattern:pat $(if $break_guard:expr)?) => { - for event in $events { - match event { + #[cfg(feature = "idempotency-key")] + let __current_idempotency_key = $crate::EventContext::current().idempotency_key(); + #[cfg(feature = "idempotency-key")] + let mut __pattern_matching_active = true; + for __persisted_event in $events { + #[cfg(feature = "idempotency-key")] + if let Some(ref __key) = __current_idempotency_key { + if let Some(ref __ctx) = __persisted_event.context { + if __ctx.idempotency_key() == Some(__key.as_str()) { + return $crate::FromAlreadyApplied::from_already_applied(); + } + } + } + #[cfg(feature = "idempotency-key")] + if __pattern_matching_active { + match &__persisted_event.event { + $($pattern $(if $guard)? => return $crate::FromAlreadyApplied::from_already_applied(),)+ + $break_pattern $(if $break_guard)? => __pattern_matching_active = false, + _ => {} + } + } + #[cfg(not(feature = "idempotency-key"))] + match &__persisted_event.event { $($pattern $(if $guard)? => return $crate::FromAlreadyApplied::from_already_applied(),)+ $break_pattern $(if $break_guard)? => break, _ => {} diff --git a/tests/entities/order.rs b/tests/entities/order.rs index 5c70a9f5..ad122eca 100644 --- a/tests/entities/order.rs +++ b/tests/entities/order.rs @@ -40,8 +40,8 @@ pub struct OrderItem { impl OrderItem { pub fn update_quantity(&mut self, quantity: i32) -> Idempotent<()> { idempotency_guard!( - self.events.iter_all().rev(), - OrderItemEvent::QuantityUpdated { quantity: q } if q == &quantity, + self.events.iter_persisted().rev(), + OrderItemEvent::QuantityUpdated { quantity: q } if *q == quantity, => OrderItemEvent::QuantityUpdated { .. } ); diff --git a/tests/entities/user.rs b/tests/entities/user.rs index f5d7afdf..81c077bf 100644 --- a/tests/entities/user.rs +++ b/tests/entities/user.rs @@ -28,7 +28,7 @@ impl User { pub fn update_name(&mut self, new_name: impl Into) -> Idempotent<()> { let new_name = new_name.into(); idempotency_guard!( - self.events.iter_all().rev(), + self.events.iter_persisted().rev(), UserEvent::NameUpdated { name } if name == &new_name, => UserEvent::NameUpdated { .. } );