-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!(Streams): Support propagating Unfolds; remove StreamResult #264
Conversation
@@ -29,7 +29,7 @@ module Pruner = | |||
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred) | |||
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events | |||
let writePos = max trimmedPos (untilIndex + 1L) | |||
return struct (writePos, res) } | |||
return struct (res, writePos) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for alignment with res, events
with equinox result tuples
@@ -75,8 +75,8 @@ type CosmosStorePruner = | |||
#endif | |||
let interpret _stream span = | |||
let metrics = StreamSpan.metrics Event.storedSize span | |||
struct (metrics, span) | |||
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r)) | |||
struct (span, metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any extra stuff layered on goes after the primary data in the tuple - no idea how/why I ended up having it backwards, but these signatures are undergoing change as part of putting the pos after the result anyway
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) | ||
let i = StreamSpan.index span | ||
let n = StreamSpan.nextIndex span | ||
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prep for #263
/// Attempts to parse a Document/Item from the Store | ||
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold) | ||
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset)) | ||
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v5 wishlist: drop correlation/causation as first class things 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's a misfeature that's one of the few white elephants in Equinox, but it does happen to be surfaced as top level indexable value in Equinox.CosmosStore
AFAIK nobody is actually leaning on it, but I could be wrong, so there'd conceptually have to be a placeholder task to provide a way to post process IEventData -> Event mapping in both directions.
tryParseEquinoxBatchOrTip streamFilter jsonDocument | ||
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this'd read better as a plain 'ole match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't but I left it!
@@ -38,7 +38,7 @@ module private TimelineEvent = | |||
fun (i, x: FsCodec.IEventData<_>, context: obj) -> | |||
if i > DateTimeOffsetPosition.factor then invalidArg (nameof i) $"Index may not exceed %d{DateTimeOffsetPosition.factor}" | |||
FsCodec.Core.TimelineEvent.Create( | |||
baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = true, context = context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoopsie?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it's intentional - this first/only use of the flag was to convey that these were ephemeral events, not backed by a store
but nothing leans on that
the change in this PR as a whole is that IsUnfold events now get dropped and replaced whenever a new set comes through (similar to how cosmos changefeed will deliver only the latest state)
But that would be bad for periodic source, which generates a set of sequntial events to represent the full state of the input at a point in time (and then re-runs the search with a new prefix when the period resets)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not picking up on anything too bad, but this would be a new major version since the user facing API seems to be changing
You are not wrong of course... But I have some buts: This handler signature was only introduced in the V3 timeframe, and I believe it to have been a misdesign - you want calculating the next version (or having it flow from what the sycing is going into) be foremost in people's minds For a V4, I think Propulsion.Feed and Propulsion should be merged as lots of modules depend on Propulsion.Feed and it in turn depends on Propulsion What if I promised to mint a V4 as soon as Equinox.CosmosStore 4.1.0 is final ? (and kill Propulsion.CosmosStore3 the minute that's done) Other things on the list that I'd like to do at some point (which don't change the V4 API but simplify the whole deal):
|
68afadf
to
50d6eb8
Compare
Enhances the pipeline to support propagation of
Unfold
events through the pipeline:EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
extracts the unfolds in addition to the eventsStreamSpan.merge
ensures we only retain the most recent ones as inputs are coalescedspan
(assuming they fit inside the limit)IsUnfold = true
andIndex
= version, where version is the Index of the last 'real' event +1maxBytes
limit will include/exclude either all or none of the unfoldsAs before, given a
Propulsion.Sink.Event[]
, you compute:Propulsion.Sink.Event.index
Propulsion.Sink.Event.nextIndex
(even if there are unfolds at the end of the sequence)Note that the following assumptions no longer hold if you have unfolds in an
Event[]
:nextIndex
<->index events + events.Length
(renamed tonext
as intentional breaking change)events.Length
What is the use case? If you're running a ChangeFeedProcessor from an Equinox.CosmosStore, the changefeed includes:
RollingState
state as held in the tipBeing able to route the Unfolds to the Handler alongside normal events enables:
propulsion sync cosmos
orpropulsion sync file cosmos
)