Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(Streams): Support propagating Unfolds; remove StreamResult #264

Merged
merged 10 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Scheduler`: `Struct`/`voption` conversions; buffer reuse [#157](https://github.com/jet/propulsion/pull/157)
- `Scheduler`: Replaced `Thead.Sleep` with `Task.WhenAny`; Added Sleep time logging [#161](https://github.com/jet/propulsion/pull/161)
- `Streams`: Changed dominant `ITimelineEvent` `EventBody` type from `byte[]` to `System.ReadOnlyMemory<byte>` (`Sinks.EventBody`) [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226)
Expand All @@ -60,6 +59,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed

- `Streams.StreamSpan`: Changed from a record to individual arguments of `FsCodec.StreamName` and `Sinks.Event[]` [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Replaced with `int64` to reflect the updated position [#264](https://github.com/jet/propulsion/pull/264) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams`: `statsInterval` is obtained from the `Stats` wherever one is supplied [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.Cosmos`: Should not be in general use - users should port to `Propulsion.CosmosStore3`, then `Propulsion.CosmosStore` [#193](https://github.com/jet/propulsion/pull/193)
- `Destructurama.FSharp` dependency [#152](https://github.com/jet/propulsion/pull/152)
Expand Down
9 changes: 4 additions & 5 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Typically, alerting should be set up based on the built in `busy` metrics provid
- `failing`: streams that have had at least one failed Handler invocation (regardless of whether they are currently the subject of a retry Handler invocation or not). Typically it should be possible to define:
- a reasonable limit before you'd want a low level alert to be raised
- a point at which you raise an alarm on the basis that the system is in a state that will lead to a SLA breach and hence merits intervention
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's `StreamResult`. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the updated Stream Position yielded in the Handler's result. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)

Alongside alerting based on breaches of SLO limits, the values of the `busy` metrics are a key high level indicator of the health of a given Processor (along with the Handler Latency distribution).

Expand Down Expand Up @@ -348,13 +348,12 @@ or the _Designing Data Intensive Applications_ book):
- DynamoDb: requesting a 'consistent read'
- CosmosDb: when using Session Consistency, require that reads are contingent on the session token being used by the feed reader. This can be achieved by using the same `CosmosClient` to ensure the session tokens are synchronized.
2. Perform a pre-flight check when reading, based on the `Index` of the newest event passed to the handler. In such a case, it may make sense to back off for a small period, before reporting failure to handle the event (by throwing an exception). The Handler will be re-invoked for another attempt, with a better chance of the event being reflected in the read.
- Once such a pre-flight check has been carried out, one can safely report `StreamResult.AllProcessed` (or `PartiallyProcessed` if you wish to defer some work due to the backlog of events triggering too much work to perform in a single invocation)
3. Perform the processing on a 'shoulder tap' basis, with the final position based on what you read.
- First, load the stream's state, performing any required reactions.
- Then report the Version attained for the stream (based on the Index of the last event processed) by yielding a `StreamResult.OverrideNextIndex`.
- Then report the Version attained for the stream (based on the Index of the last event processed) as the Handler's updated Position for that Stream
- In this case, one of following edge cases may result:
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield `StreamResult.OverrideNextIndex`, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `StreamResult.OverrideNextIndex 2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield the Position that the processing did see, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.

### Consistency in the face of at least once delivery and re-traversal of events

Expand Down
2 changes: 2 additions & 0 deletions Propulsion.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=abend/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Dispatchable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=exns/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=ingester/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Prioritizer/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=slipstreamed/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=slipstreaming/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=stddev/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Pruner =
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred)
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }
return struct (res, writePos) }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for alignment with res, events with equinox result tuples


type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
Expand Down Expand Up @@ -75,8 +75,8 @@ type CosmosStorePruner =
#endif
let interpret _stream span =
let metrics = StreamSpan.metrics Event.storedSize span
struct (metrics, span)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r))
struct (span, metrics)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any extra stuff layered on goes after the primary data in the tuple - no idea how/why I ended up having it backwards, but these signatures are undergoing change as part of putting the pos after the result anyway

Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Expand Down
Loading
Loading