Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 17, 2024
1 parent 878a0a7 commit acc08c3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 44 deletions.
5 changes: 4 additions & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ module Internal =
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
| Ok struct (Writer.Result.Ok pos', _stats) ->
struct (streams.SetWritePos(stream, pos'), false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.SetWritePos(stream, pos') |> ignore // throw away the events (but not the unfolds)
ValueNone, false // Don't declare progress
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream), false
| Error struct (exn, _stats) ->
Expand Down
92 changes: 49 additions & 43 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -115,40 +115,31 @@ module StreamSpan =
match index xs with
| xi when xi = i -> xs
| xi -> xs |> Array.skip (i - xi |> int)

let merge min (spans: FsCodec.ITimelineEvent<_>[][]) =
let candidates =
[| for span in spans do
if span <> null then
match dropBeforeIndex min span with
| [||] -> ()
| xs -> xs |]
if candidates.Length = 0 then null
elif candidates.Length = 1 then candidates
else
candidates |> Array.sortInPlaceBy index

let mutable acc = candidates[0] // buffer first item
let mutable buffer = null
for i in 1 .. candidates.Length - 1 do
let x = candidates[i]
let accNext = nextIndex acc
if index x > accNext then // Gap
if acc |> Seq.exists (_.IsUnfold >> not) then
if buffer = null then buffer <- ResizeArray(candidates.Length)
buffer.Add(acc |> Array.filter (_.IsUnfold >> not))
acc <- x
elif nextIndex x >= accNext then // Overlapping; join
match dropBeforeIndex accNext x with
| [||] -> ()
| news ->

acc <- [| yield! acc |> Seq.filter (_.IsUnfold >> not); yield! news |]
match acc with
| [||] when buffer = null -> null
| [||] -> buffer.ToArray()
| last when buffer = null -> Array.singleton last
| last -> buffer.Add last; buffer.ToArray()
let private merge_ xs =
xs |> Array.sortInPlaceBy index
let mutable outputs, acc = null, xs[0]
for x in xs |> Seq.skip 1 do
match nextIndex acc with
| accNext when index x > accNext -> // Gap
if acc |> Seq.exists (_.IsUnfold >> not) then
if outputs = null then outputs <- ResizeArray(xs.Length)
outputs.Add(acc |> Array.filter (_.IsUnfold >> not))
acc <- x
| accNext when nextIndex x >= accNext -> // Overlapping; join
match dropBeforeIndex accNext x with
| [||] -> ()
| news -> acc <- [| yield! acc |> Seq.filter (_.IsUnfold >> not); yield! news |]
| _ -> ()
match acc with
| [||] when outputs = null -> null
| unified when outputs = null -> Array.singleton unified
| [||] -> outputs.ToArray()
| tail -> outputs.Add tail; outputs.ToArray()
let merge min (inputs: FsCodec.ITimelineEvent<_>[][]) =
match inputs |> Array.choose (function null -> None | x -> match dropBeforeIndex min x with [||] -> None | xs -> Some xs) with
| [||] -> null
| [| _ |] as alreadyUnified -> alreadyUnified
| xs -> merge_ xs

/// A Single Event from an Ordered stream being supplied for ingestion into the internal data structures
type StreamEvent<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>))
Expand Down Expand Up @@ -176,18 +167,26 @@ module Buffer =

member x.WritePos = match x.write with WritePosUnknown | WritePosMalformed -> ValueNone | w -> ValueSome w
member x.CanPurge = x.IsEmpty
// member x.EventsAndUnfoldsCount =
// if x.queue = null then 0, 0
// else
// let counts = Seq.concat x.queue |> Seq.countBy _.IsUnfold |> Map
// let countFor value = counts |> Map.tryFind value |> Option.defaultValue 0
// countFor false, countFor true

module StreamState =

let combine (s1: StreamState<_>) (s2: StreamState<_>): StreamState<'Format> =
let writePos = max s1.WritePos s2.WritePos
let malformed = s1.IsMalformed || s2.IsMalformed
let any1 = not (isNull s1.queue)
let any2 = not (isNull s2.queue)
if any1 || any2 then
let items = if any1 && any2 then Array.append s1.queue s2.queue elif any1 then s1.queue else s2.queue
StreamState<'Format>.Create(writePos, StreamSpan.merge (defaultValueArg writePos 0L) items, malformed)
else StreamState<'Format>.Create(writePos, null, malformed)
let writePos = max s1.WritePos s2.WritePos
let queue =
let any1 = not (isNull s1.queue)
let any2 = not (isNull s2.queue)
if any1 || any2 then
let items = if any1 && any2 then Array.append s1.queue s2.queue elif any1 then s1.queue else s2.queue
StreamSpan.merge (defaultValueArg writePos 0L) items
else null
StreamState<'Format>.Create(writePos, queue, malformed)

type Streams<'Format>() =
let states = Dictionary<FsCodec.StreamName, StreamState<'Format>>()
Expand Down Expand Up @@ -305,7 +304,14 @@ module Scheduling =
| _ -> ValueNone

member _.WritePos(stream) = tryGetItem stream |> ValueOption.bind _.WritePos
member _.SetWritePos(stream, pos) = updateWritePos stream false (ValueSome pos) null
member _.SetWritePos(stream, pos) =
// let count () = tryGetItem stream |> ValueOption.map _.EventsAndUnfoldsCount |> ValueOption.defaultValue (0, 0)
// let beforeE, beforeU = count ()
let res = updateWritePos stream false (ValueSome pos) null
// let afterE, afterU = count ()
// if (afterU <> 0 || beforeU <> 0) && afterU <> beforeU then
// Log.Information("Stream {s} before {be}e {bu}u after {ae}e {au}u", stream, beforeE, beforeU, afterE, afterU)
res
member _.MarkMalformed(stream, isMalformed) = updateWritePos stream isMalformed ValueNone null
member _.WritePositionIsAlreadyBeyond(stream, required) =
match tryGetItem stream with
Expand Down Expand Up @@ -685,7 +691,7 @@ module Scheduling =
for x in pending do
// example: when we reach position 1 on the stream (having handled event 0), and the required position was 1, we remove the requirement
let mutable requiredIndex = Unchecked.defaultof<_>
if x.streamToRequiredIndex.TryGetValue(stream, &requiredIndex) && requiredIndex <= index then
if x.streamToRequiredIndex.TryGetValue(stream, &requiredIndex) && index >= requiredIndex then
x.streamToRequiredIndex.Remove stream |> ignore

member _.Dump(log: ILogger, lel, classify: FsCodec.StreamName -> Stats.Busy.State) =
Expand Down
8 changes: 8 additions & 0 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ let [<Fact>] ``fail 2`` () =
let r = merge 11613L [| mk 11614L 1; null |]
test <@ r |> is [| mk 11614L 1 |] @>

let [<Fact>] ``merge to strip Events should not strip unfolds 0`` () =
let r = merge 0L [| mk_ 0L 0 0 1 |]
test <@ r |> is [| mkU 0L 1 |] @>

let [<Fact>] ``merge to strip Events should not strip unfolds`` () =
let r = merge 1L [| mk_ 0L 1 0 1 |]
test <@ r |> is [| mkU 1L 1 |] @>

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame

let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds, one per event type`` counts =
Expand Down

0 comments on commit acc08c3

Please sign in to comment.