Skip to content

Commit 5ce4774

Browse files
committed
Tidy wiring
1 parent 16bd139 commit 5ce4774

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

equinox-fc/Watchdog/Program.fs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,21 @@ let build (args : CmdParser.Arguments) =
331331
let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute)
332332
fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale)
333333
let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext<Checkpoint.CheckpointSeries>(), resolveCheckpointStream)
334+
let handle (_stream, _span) : Async<int64*Handler.Outcome> = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction)
335+
let sink =
336+
Propulsion.Streams.StreamsProjector.Start(
337+
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))
338+
let connect () = let c = connectEs () in c.ReadConnection
339+
let runPipeline =
340+
EventStoreSource.Run(
341+
Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream,
342+
args.MaxReadAhead, args.StatsInterval)
343+
sink, runPipeline
344+
| Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) ->
345+
let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails()
346+
let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously
347+
let cache = Equinox.Cache(AppName, sizeMb = 10)
348+
let context = Equinox.Cosmos.Context(connection, database, container)
334349
let inventoryService =
335350
let inventoryId = InventoryId.parse "FC000"
336351
let maxTransactionsPerEpoch = 100
@@ -350,18 +365,6 @@ let build (args : CmdParser.Arguments) =
350365
let processor = Fc.Inventory.Processor.Service(transactionService, locations, inventoryService)
351366

352367
let handle = Handler.handleStreamEvents (Handler.tryHandle processor.Push)
353-
let sink =
354-
Propulsion.Streams.StreamsProjector.Start(
355-
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))
356-
let connect () = let c = connectEs () in c.ReadConnection
357-
let runPipeline =
358-
EventStoreSource.Run(
359-
Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream,
360-
args.MaxReadAhead, args.StatsInterval)
361-
sink, runPipeline
362-
| Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) ->
363-
let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails()
364-
let handle (_stream, _span) : Async<int64*Handler.Outcome> = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction)
365368
let sink =
366369
Propulsion.Streams.StreamsProjector.Start(
367370
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))

0 commit comments

Comments
 (0)