Skip to content

Commit ba7c315

Browse files
committed
Apply style changes from 4.2
1 parent 5ce4774 commit ba7c315

File tree

9 files changed

+84
-76
lines changed

9 files changed

+84
-76
lines changed

equinox-fc/Domain/Inventory.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type Service2 internal (inventoryId, series : Series.Service, epochs : Epoch.Ser
7070

7171
module internal Helpers =
7272

73-
let createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) =
73+
let create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) =
7474
let remainingEpochCapacity (state: Epoch.Fold.State) =
7575
let currentLen = state.ids.Count
7676
max 0 (maxTransactionsPerEpoch - currentLen)
@@ -79,9 +79,9 @@ module internal Helpers =
7979
module Cosmos =
8080

8181
let create inventoryId maxTransactionsPerEpoch lookBackLimit (context, cache) =
82-
let series = Series.Cosmos.createService (context, cache)
83-
let epochs = Epoch.Cosmos.createService (context, cache)
84-
Helpers.createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs)
82+
let series = Series.Cosmos.create (context, cache)
83+
let epochs = Epoch.Cosmos.create (context, cache)
84+
Helpers.create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs)
8585

8686
module Processor =
8787

equinox-fc/Domain/InventoryEpoch.fs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
/// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state
44
module Fc.Inventory.Epoch
55

6+
let [<Literal>] Category = "InventoryEpoch"
7+
let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]
8+
69
// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
710
[<RequireQualifiedAccess>]
811
module Events =
912

10-
let [<Literal>] CategoryId = "InventoryEpoch"
11-
let (|For|) (inventoryId, epochId) = FsCodec.StreamName.compose CategoryId [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]
12-
1313
type TransactionRef = { transactionId : InventoryTransactionId }
1414
type Snapshotted = { closed: bool; ids : InventoryTransactionId[] }
1515

@@ -70,9 +70,7 @@ let decideSync capacity events (state : Fold.State) : Result * Events.Event list
7070
let state' = Fold.fold state events
7171
{ isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events
7272

73-
type Service internal (log, resolve, maxAttempts) =
74-
75-
let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
73+
type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream<Events.Event, Fold.State>) =
7674

7775
/// Attempt ingestion of `events` into the cited Epoch.
7876
/// - None will be accepted if the Epoch is `closed`
@@ -83,14 +81,16 @@ type Service internal (log, resolve, maxAttempts) =
8381
let stream = resolve (inventoryId, epochId)
8482
stream.Transact(decideSync capacity events)
8583

86-
let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 2)
84+
let create resolver =
85+
let resolve locationId =
86+
let stream = resolver (streamName locationId)
87+
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
88+
Service (resolve)
8789

8890
module Cosmos =
8991

90-
open Equinox.Cosmos
91-
9292
let accessStrategy = Equinox.Cosmos.AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
9393
let resolve (context, cache) =
94-
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
95-
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
96-
let createService (context, cache) = createService (resolve (context, cache))
94+
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
95+
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
96+
let create (context, cache) = create (resolve (context, cache))

equinox-fc/Domain/InventorySeries.fs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
/// See Inventory.Service for the surface API which manages the writing
44
module Fc.Inventory.Series
55

6+
let [<Literal>] Category = "InventorySeries"
7+
let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId)
8+
69
// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
710
[<RequireQualifiedAccess>]
811
module Events =
912

10-
let [<Literal>] CategoryId = "InventorySeries"
11-
let (|For|) inventoryId = FsCodec.StreamName.create CategoryId (InventoryId.toString inventoryId)
12-
1313
type Started = { epoch : InventoryEpochId }
1414
type Event =
1515
| Started of Started
@@ -30,9 +30,7 @@ let interpretAdvanceIngestionEpoch epochId (state : Fold.State) =
3030
if queryActiveEpoch state >= epochId then []
3131
else [Events.Started { epoch = epochId }]
3232

33-
type Service internal (log, resolve, maxAttempts) =
34-
35-
let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
33+
type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =
3634

3735
member __.ReadIngestionEpoch(inventoryId) : Async<InventoryEpochId> =
3836
let stream = resolve inventoryId
@@ -42,19 +40,21 @@ type Service internal (log, resolve, maxAttempts) =
4240
let stream = resolve inventoryId
4341
stream.Transact(interpretAdvanceIngestionEpoch epochId)
4442

45-
let createService resolve =
46-
Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 2)
43+
let create resolver =
44+
let resolve locationId =
45+
let stream = resolver (streamName locationId)
46+
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
47+
Service (resolve)
4748

4849
module Cosmos =
4950

50-
open Equinox.Cosmos
51-
51+
let accessStrategy = Equinox.Cosmos.AccessStrategy.LatestKnownEvent
5252
let resolve (context, cache) =
53-
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
53+
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
5454
// For this stream, we uniformly use stale reads as:
5555
// a) we don't require any information from competing writers
5656
// b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
5757
let opt = Equinox.ResolveOption.AllowStale
58-
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt)
59-
let createService (context, cache) =
60-
createService (resolve (context, cache))
58+
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
59+
let create (context, cache) =
60+
create (resolve (context, cache))

equinox-fc/Domain/InventoryTransaction.fs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
/// This represents the case where a 'happy path' actor died, or experienced another impediment on the path.
1010
module Fc.Inventory.Transaction
1111

12+
let [<Literal>] Category = "InventoryTransaction"
13+
let streamName transactionId = FsCodec.StreamName.create Category (InventoryTransactionId.toString transactionId)
14+
1215
// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
1316
[<RequireQualifiedAccess>]
1417
module Events =
1518

16-
let [<Literal>] CategoryId = "InventoryTransaction"
17-
let (|For|) transactionId = FsCodec.StreamName.create CategoryId (InventoryTransactionId.toString transactionId)
18-
1919
type AdjustmentRequested = { location : LocationId; quantity : int }
2020
type TransferRequested = { source : LocationId; destination : LocationId; quantity : int }
2121
type Removed = { balance : int }
@@ -130,28 +130,28 @@ let decide update (state : Fold.State) : Action * Events.Event list =
130130
let state' = Fold.fold state events
131131
Fold.nextAction state', events
132132

133-
type Service internal (log, resolve, maxAttempts) =
134-
135-
let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
133+
type Service internal (resolve : InventoryTransactionId -> Equinox.Stream<Events.Event, Fold.State>) =
136134

137135
member __.Apply(transactionId, update) : Async<Action> =
138136
let stream = resolve transactionId
139137
stream.Transact(decide update)
140138

141-
let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)
139+
let create resolver =
140+
let resolve inventoryTransactionId =
141+
let stream = resolver (streamName inventoryTransactionId)
142+
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
143+
Service (resolve)
142144

143145
module Cosmos =
144146

145-
open Equinox.Cosmos
146-
147147
// in the happy path case, the event stream will typically be short, and the state cached, so snapshotting is less critical
148148
let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
149149
// ... and there will generally be a single actor touching it at a given time, so we don't need to do a load (which would be more expensive than normal given the `accessStrategy`) before we sync
150150
let opt = Equinox.AllowStale
151151
let resolve (context, cache) =
152-
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
153-
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
154-
let createService (context, cache) = createService (resolve (context, cache))
152+
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
153+
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
154+
let createService (context, cache) = create (resolve (context, cache))
155155

156156
/// Handles requirement to infer when a transaction is 'stuck'
157157
/// Note we don't want to couple to the state in a deep manner; thus we track:
@@ -188,3 +188,16 @@ module Watchdog =
188188
| Fold.Active startTime when startTime < cutoffTime -> Stuck
189189
| Fold.Active _ -> Active
190190
| Fold.Completed -> Complete
191+
192+
let fold : Events.TimestampAndEvent seq -> Fold.State =
193+
Fold.fold Fold.initial
194+
195+
let (|FoldToWatchdogState|) events : Fold.State =
196+
events
197+
|> Seq.choose Events.codec.TryDecode
198+
|> fold
199+
200+
let (|Match|_|) = function
201+
| FsCodec.StreamName.CategoryAndId (Category, InventoryTransactionId.Parse transId), FoldToWatchdogState state ->
202+
Some (transId, state)
203+
| _ -> None

equinox-fc/Domain/Location.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ module Cosmos =
4444

4545
let createService (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
4646
let series = Series.Cosmos.createService (context, cache, maxAttempts)
47-
let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts)
47+
let epochs = Epoch.Cosmos.create (context, cache, maxAttempts)
4848
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs)

equinox-fc/Domain/LocationEpoch.fs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
/// See Location.Service for the logic that allows competing readers/writers to co-operate in bringing this about
44
module Fc.Location.Epoch
55

6+
let [<Literal>] Category = "LocationEpoch"
7+
let streamName (locationId, epochId) = FsCodec.StreamName.compose Category [LocationId.toString locationId; LocationEpochId.toString epochId]
8+
69
// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
710
[<RequireQualifiedAccess>]
811
module Events =
912

10-
let [<Literal>] CategoryId = "LocationEpoch"
11-
let (|For|) (locationId, epochId) = FsCodec.StreamName.compose CategoryId [LocationId.toString locationId; LocationEpochId.toString epochId]
12-
1313
type CarriedForward = { initial : int; recentTransactions : InventoryTransactionId[] }
1414
type Event =
1515
| CarriedForward of CarriedForward
@@ -24,7 +24,7 @@ module Fold =
2424

2525
type State =
2626
| Initial
27-
| Open of Record list // reverse order, i.e. most revent first
27+
| Open of Record list // reverse order, i.e. most recent first
2828
| Closed of Record list // trimmed
2929
and Record =
3030
| Init of Events.CarriedForward
@@ -127,22 +127,23 @@ let decide transactionId command (state: Fold.State) =
127127
| Fold.Open (Fold.Current cur) -> (if accepted then Accepted cur else Denied), events
128128
| s -> failwithf "Unexpected state %A" s
129129

130-
type Service internal (log, resolve, maxAttempts) =
131-
132-
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
130+
type Service internal (resolve : LocationId * LocationEpochId -> Equinox.Stream<Events.Event, Fold.State>) =
133131

134132
member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async<Result<'R>> =
135133
let stream = resolve (locationId, epochId)
136134
stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose)
137135

138-
let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = maxAttempts)
136+
let create resolver maxAttempts =
137+
let resolve locId =
138+
let stream = resolver (streamName locId)
139+
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
140+
Service (resolve)
139141

140142
module Cosmos =
141143

142-
open Equinox.Cosmos
143-
144+
let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
144145
let resolve (context, cache) =
145-
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
146-
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve
147-
let createService (context, cache, maxAttempts) =
146+
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
147+
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
148+
let create (context, cache, maxAttempts) =
148149
create (resolve (context, cache)) maxAttempts

equinox-fc/Domain/LocationSeries.fs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
/// Manages the active epoch for a given Location
22
module Fc.Location.Series
33

4+
let [<Literal>] Category = "LocationSeries"
5+
let streamName locationId = FsCodec.StreamName.create Category (LocationId.toString locationId)
6+
47
// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
58
[<RequireQualifiedAccess>]
69
module Events =
710

8-
let [<Literal>] CategoryId = "LocationSeries"
9-
let (|For|) locationId = FsCodec.StreamName.create CategoryId (LocationId.toString locationId)
10-
1111
type Started = { epoch : LocationEpochId }
1212
type Event =
1313
| Started of Started
@@ -27,9 +27,7 @@ let interpretAdvanceIngestionEpoch (epochId : LocationEpochId) (state : Fold.Sta
2727

2828
[if state |> Option.forall (fun s -> s < epochId) then yield Events.Started { epoch = epochId }]
2929

30-
type Service internal (log, resolve, maxAttempts) =
31-
32-
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
30+
type Service internal (resolve : LocationId -> Equinox.Stream<Events.Event, Fold.State>) =
3331

3432
member __.TryReadIngestionEpoch(locationId) : Async<LocationEpochId option> =
3533
let stream = resolve locationId
@@ -39,7 +37,11 @@ type Service internal (log, resolve, maxAttempts) =
3937
let stream = resolve locationId
4038
stream.Transact(interpretAdvanceIngestionEpoch epochId)
4139

42-
let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts)
40+
let create resolver maxAttempts =
41+
let resolve locId =
42+
let stream = resolver (streamName locId)
43+
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
44+
Service (resolve)
4345

4446
module Cosmos =
4547

equinox-fc/Watchdog/Handler.fs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,10 @@ type Stats(log, ?statsInterval, ?stateInterval) =
2222

2323
open Fc.Inventory.Transaction
2424

25-
let fold : Watchdog.Events.TimestampAndEvent seq -> Watchdog.Fold.State =
26-
Watchdog.Fold.fold Watchdog.Fold.initial
27-
28-
let (|FoldToWatchdogState|) (span : Propulsion.Streams.StreamSpan<_>) : Watchdog.Fold.State =
29-
span.events
30-
|> Seq.choose Watchdog.Events.codec.TryDecode
31-
|> fold
32-
3325
let tryHandle driveTransaction (stream, span : Propulsion.Streams.StreamSpan<_>) : Async<Outcome> = async {
3426
let processingStuckCutoff = let now = DateTimeOffset.UtcNow in now.AddSeconds -10.
35-
match stream, span with
36-
| FsCodec.StreamName.CategoryAndId (Events.CategoryId, InventoryTransactionId.Parse transId), FoldToWatchdogState state ->
27+
match stream, span.events with
28+
| Watchdog.Match (transId, state) ->
3729
match Watchdog.categorize processingStuckCutoff state with
3830
| Watchdog.Complete ->
3931
return Outcome.Completed

equinox-fc/Watchdog/Program.fs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ module EventStoreContext =
312312
| e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (isWhitelisted e.EventStreamId) -> None
313313
| PropulsionStreamEvent e -> Some e
314314

315-
let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Events.CategoryId
315+
let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Category
316316
let isTransactionStream : string -> bool = function sn -> sn.StartsWith transactionStreamPrefix
317317

318318
let build (args : CmdParser.Arguments) =
@@ -355,8 +355,8 @@ let build (args : CmdParser.Arguments) =
355355
let locations =
356356
let zeroBalance : Fc.Location.Epoch.Events.CarriedForward = { initial = 0; recentTransactions = [||] }
357357
let chooseTransactionIds = function
358-
| Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids
359-
| Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id
358+
| Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids
359+
| Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id
360360
let toBalanceCarriedForward (Fc.Location.Epoch.Fold.Current cur as records) : Fc.Location.Epoch.Events.CarriedForward =
361361
{ initial = cur; recentTransactions = records |> Seq.collect chooseTransactionIds |> Seq.truncate 5 |> Seq.toArray }
362362
let shouldClose x = false

0 commit comments

Comments
 (0)