-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a reactor to reserve and dole out invoice numbers #1
base: main
Are you sure you want to change the base?
Conversation
let fold: State -> Events.Event seq -> State = Seq.fold evolve | ||
|
||
let streamId = StreamId.gen2 SourceId.toString id | ||
type CheckpointService internal (resolve: string -> StreamId -> Equinox.Decider<Events.Event, Fold.State>) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't in the mood to add another PG user and schema for the checkpoints. This just checkpoints into an adjacent stream.
e.g. if you're group MyGroup
listening to the category MyCategory
, it'll checkpoint to MyCategory:position-messageDb_MyGroup
. The messageDb
comes from propulsion and is to allow for multiple sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might upstream this to propulsion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd not be surprised if @bartelink had already made a storage agnostic decider like this that I'm just not aware of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The document-based stores (which can do rollingstate updates) do this https://github.com/jet/propulsion/blob/master/src/Propulsion.CosmosStore/ReaderCheckpoint.fs
But an equivalent for PG is not currently implemented (might not be the daftest idea!)
For now I think using https://github.com/jet/propulsion/blob/master/src/Propulsion.MessageDb/ReaderCheckpoint.fs#L49 is simply the right thing to do, even if it makes some mess ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrt upstreaming, having an ever-growing stream would be a blocker
Also while in general it can be helpful to keep checkpoints in the same place as the info being derived (and that's the case here), another consideration is that you want to avoid the feedback effect of writing a checkpoint position triggering a (null) reaction (and checkpoint update every 5s) each time
For messagedb, if you keep Invoice, InvoiceNumber and the checkpoints in the same DB, [as events] this would be the case
- if MessageDb positions are per category then this does not apply
- if you kept all 3 categories in ESDB it would (global pos would inc after you write a checkpoint pos)
- for cosmos, changefeed checkpoints are kept in a
-aux
separated container to avoid this effect - for dynamo, the Indexer should exclude 'system streams' such as
$ReaderCheckpoint-{x}
https://github.com/jet/propulsion/blob/1d862e907502d727f5508b401deb85c6c60ef1e7/src/Propulsion.CosmosStore/ReaderCheckpoint.fs#L17 at https://github.com/jet/propulsion/blob/master/src/Propulsion.DynamoStore.Indexer/Handler.fs#L24 (logging issue DynamoStore: Indexer should not index any 'system' streams jet/propulsion#197) to avoid the equivalent (not this is not as critical as it may seem as there is only one event per hour)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I think using
MessageDb.ReaderCheckpoint
is simply the right thing to do
It's definitely not a bad idea. But as always, there are trade-offs.
- Needs a separate connection string (checkpoints are stored in a different schema and the message_store role only has access to the message_store).
- Can lead to unpredictable performance because PG needs to VACUUM the table every now and then
I would definitely default to the checkpoints table, but I think there's value in having the option to "just use messagedb"
another consideration is that you want to avoid the feedback effect of writing a checkpoint position triggering a (null) reaction (and checkpoint update every 5s) each time
I don't think this applies to MessageDB as we keep the position on a per category basis. There's no feedback because the checkpoints go to a separate category.
having an ever-growing stream would be a blocker
The position of message-db's authors (which I agree with) is that message db should not be in the business of deleting data. Any production deployment of message-db will include pruning procedures outside of the application. With that in mind I don't think it should be a blocker as it's entirely consistent with real-world usage of MessageDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With that in mind I don't think it should be a blocker as it's entirely consistent with real-world usage of MessageDB.
Not suggesting there should be built-in pruning.
I don't think this applies to MessageDB as we keep the position on a per category basis. There's no feedback because the checkpoints go to a separate category.
If the position is the event index within the category and not affected by writes to other categories, I guess that will dramatically reduce the amount of checkpointing (but e.g. if you wanted to store ESDB checkpoints in PG, then an event every 5s is going to add up, and inevitably someone will get paged at 5am and get annoyed with @nordfjord when they git blame
!)
I would definitely default to the checkpoints table, but I think there's value in having the option to "just use messagedb"
Maybe for the sample have the checkpoints DB connstr be optional iff people opt into "unbounded stream" checkpoints ?
IOW if they run with --checkpointStream
, they can avoid rigging the checkpoints schema ?
Maybe I'm being a bit strong - In the context of the sample as a whole, the aggregate does serve as a nice example impl. But I do feel it's a step too far to put it in the box unless it's in a Internal
namespace with a name that conveys that it will grow forever and you might just want to use the one that keeps one row in the DB ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the position is the event index within the category and not affected by writes to other categories, I guess that will dramatically reduce the amount of checkpointing
The position is from the global sequence. The position sequence in the category is therefore monotonic but not atomic (e.g. 1,3,8,12222 is a perfectly valid sequence of positions). Also note that the subscription is polling get_category_messages
for each of the categories supplied (so we'll only get evens for the subscribed categories). This is of course different from how the other stores function where the subscription is to an $all
stream.
To be clear, when I say "real-world" I mean that this is how eventide does its checkpointing
App/HostedReactor.fs
Outdated
open Propulsion.MessageDb | ||
|
||
type HostedReactor(connectionString, log, service: InvoiceNumberingReactor.Service, checkpointService: CheckpointStore.CheckpointService) = | ||
inherit BackgroundService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the purposes of this project, a hosted service makes more sense than a whole separate process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, nice
App/HostedReactor.fs
Outdated
let stats = | ||
{ new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with | ||
member _.HandleOk x = () | ||
member _.HandleExn(log, x) = () } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not log.Information(e, "Unhandled")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as noted in a different comment. Propulsion and its stats is an area I'm unfamiliar with. The only metrics I'm used to keeping are derived from spans and includes:
- Number of messages processed
- Cycle time
- Lead time
- Poll count (if this drops to 0 raise an alert)
- Lag from head of category
- Time lag from head of category
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of generic stats:
A Streams-based Projector emits
- handler latency percentiles
- handler invocation counts
- event counts, unique streams counts
There are also state metrics such as longest duration a stream is having handler invocagtions without progress (if this exceeds SLA, raise alert)
Lag metrics are not a strong point
- Cosmos reader has some based on the CFP lag estimation
- Dynamo doesnt have anything much use
- Dynamo and others have a common feed reader base, which does emit metrics
There are Serilog virtual sinks that parse the embedded metrics (which are embedded in the Serilog LogEvents), and emit those to the Prometheus sink)
In general any integration would curently be expected to work on that basis (observe the LogEvents, emit based on that)
(See the other comments for App level metrics, i.e recording/reporting outcomes from reactors in a way that Prometheus+Grafana can surface well is something that the dotnet-templates and the design of Propulsion in general tries to encourage)
The lowest common denominator is also a key part though - a Reactor or Projector should, without requiring a suite of Dockerized stuff to be configured or learned, give plenty clues to what's going on in good and bad times - error rates, throughput and latency.
This even extends to test scenarios - a test run should dump stats as a Reactor invocation on the console would. (There is wiring for this in jet/dotnet-templates#127 and equinox-shipping and more; I've yet to to the wiring for Propulsion.MessageDb to be used as a source in that sample, but the Stats are managed in the same way regardless of the source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open to PRs for any stats you consider to have proven themselves. Also not averse to having otel wiring added - main thing is that I consider the default logging of a Projector/Reactor app as managed by the Stats class to be a core feature -- otel and/or prometheus wiring is a nice bonus on top of that
App/HostedReactor.fs
Outdated
member _.HandleExn(log, x) = () } | ||
|
||
[<Literal>] | ||
let groupName = "InvoiceNumberingReactor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PascalCase so it can be used as a constant pattern match and/or otherwise fit conventions
App/HostedReactor.fs
Outdated
let maxReadAhead = 100 | ||
let maxConcurrentStreams = 2 | ||
|
||
let handle struct(stream, events) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-rc provides a task/func layer (without struct
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I can update once jet/propulsion#198 is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks/sorry. Will ping over there when its releasing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to be sorry. My test didn't cover it, so it's entirely on me 🙈
App/HostedReactor.fs
Outdated
let handle struct(stream, events) = | ||
async { | ||
for event in events do | ||
do! service.Handle(stream, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a parallel universe I use one char indents!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can meet in the middle and use 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also submit to 0x9
already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've always wanted to try 5
one day ;) The day Fantomas applies 0x9
, we can do that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The man is wise ;)
(Does go or any other fmt
use tabs?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not aware of any language that defaults to tabs. Hopefully that changes soon 😉, when it happens you can have your 4 indents, and I can have my 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I got used to 4 in Turbo/Borland Pascal, C and C++, I suspect I'd actually run it with 2 at that point.
An indent of 2 in F# is triggering for me as I strongly associate it with old pain of people wanting to write Haskell and/or Arrows-based stuff in F#.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sure... Let's use 2 indents, AND write crap like this
let handle (stream, events) =
events
*> (fun ev -> service.Handle(stream, ev))
|> foldr (fun acc x -> x *> prepend @> acc) []
Invoice/Domain.fs
Outdated
@@ -1,9 +1,10 @@ | |||
module Invoice | |||
|
|||
open TypeShape.Core.Core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this go?
Invoice/Domain.fs
Outdated
@@ -58,6 +62,7 @@ module Fold = | |||
| Raised state -> | |||
match event with | |||
| InvoiceRaised _ as e -> failwithf "Unexpected %A" e | |||
| InvoiceNumbered data -> Raised { state with InvoiceNumber = Some data.InvoiceNumber } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| InvoiceNumbered data -> Raised { state with InvoiceNumber = Some data.InvoiceNumber } | |
| InvoiceNumbered e -> Raised { state with InvoiceNumber = Some e.InvoiceNumber } |
InvoiceNumbering/Domain.fs
Outdated
reservedFor = reservedFor |} ] | ||
| Some n -> n, [] | ||
|
||
[<Literal>] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convention says top of file
- provides a cue that a) this is an event sourced aggregate
- makes it easier to scan/navigate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convention says top of file
I never noticed 😅. It makes sense. I've been putting it next to the service because everything above it is "pure" domain stuff, it's only from this line down that infra/wiring comes into play.
But convention is convention, I'll move it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on second thought, I think I'll keep it this way since this project is pedagogical. The Invoice domain file also has it in this order, and that's purely because when I wrote the post the category/stream don't factor into the discussion until later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/jet/dotnet-templates#aggregate-module
Yes definitely debatable in context of this sample
If you start putting a module Config
per aggregate, then it also can get noisy
A final reason to have it up top by convention is in wierd cases where you have multiple folds and/or services (read/write/index etc) they all share a category name and its nice for that to have an ultra stable/obvious place e.g. https://github.com/jet/propulsion/blob/master/src/Propulsion.DynamoStore/AppendsIndex.fs#L61
InvoiceNumbering/Reactor.fs
Outdated
| FsCodec.StreamName.CategoryAndId(Invoice.Category, Invoice.InvoiceId.Parse invoiceId) -> | ||
match Invoice.Events.codec.TryDecode event with | ||
| ValueSome(Invoice.Events.InvoiceRaised _) -> | ||
let! reservedNumber = numberService.ReserveNext(invoiceId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make the call lazy/async for efficiency under reprocessing? (means TransactAsync vs Transact so maybe not best for a sample tho)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having trouble visualising this, What would it look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block would become
let resolveNumber () : Async<InvoiceNumber> = numberService.ReserveNext(invoiceId)
do! invoiceService.EnsureNumber(invoiceId, resolveNumber)
(that resolve is overloaded, but allocate
or generate
would not sufficiently emphasize that it might return one from a previous call)
the decide
function would be supplied the resolveNumber
argument and would call it iff the state showed that the number was None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Riight, I see. You're trying to minimise the work done during retries!
The invoice service would receive a function to reserve a number for itself. So if it's already reserved it's only reading one stream instead of two!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, one might say its making Decisions for itself ;)
Obviously this is an overkill case in the context of the actual realistic need here.
But this makes way more sense when an indexer reactor is feeding Shipments that have been physically dropped into the shipping container (when the final content that made it based on the initial pick list is known) into batches for financial reporting.
In that context, you run with a DOP of 16 with each Shipped
event triggering:
- find the active batch epoch
- write that start point down as an event on the Shipment
- ingest it into the batch, walking forward from the start point noted in 2
The "note the starting point and write it down (or just load it if that has already been done)" can be done 16 in parallel (each is an individual roundtrip to a stream)
the next phase, "register them in the batch", is done as a single walk (using an AsyncBatchingGate
) with the 16 requests merged into one (if you ran them in parallel, they'd continually run into conflicts i.e. WrongExpectedVersionException as they are all trying to merge into the same stream)
But it's definitely a key point that the most general form of such a process is where the overall decision consists of multiple async steps (and the entire chain may need to be rewalked if there is a conflict when you try to store the result as an event). If your only pattern is to feed values into the Service method and/or as a Command, then the decision cannot be dependent on something that's in the aggregate's state. (Of course as usual, none of this potential stuff applies in this instance)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this to conform with your suggestion. I think that's a lot better overall. Thanks
InvoiceNumbering/Reactor.fs
Outdated
member _.Handle(streamName, event) = | ||
async { | ||
match streamName with | ||
| FsCodec.StreamName.CategoryAndId(Invoice.Category, Invoice.InvoiceId.Parse invoiceId) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is not the time to keep it clear, but you could add a let (|StreamName|_|) ...
to put the tryparse beside the gen in the aggregate's module
App/HostedReactor.fs
Outdated
for event in events do | ||
do! service.Handle(stream, event) | ||
|
||
return struct (Propulsion.Streams.SpanResult.AllProcessed, ()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could add stats about 1) numbers genned 2) invoices numbered 3) replays (neither)
(the high level stats will show the total requests, streams and events traversed, so in the case of the event not being relevant, you can return an 'Outcome.Noop', but no need to gather stats on it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stats area is one I'm not very familiar/comfortable with. Would love to learn more about how you approach these things!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR the default wiring is plumbed for each Handler invocation (which returns SpanResult*'Outcome
) to yield an 'Outcome
which summarises the work done for one invocation
That outcome is flowed through to the HandleOk
on the Stats
after successful processing (the calls to it are serialized so no concurrency control required)
The stats are then dumped every 1m (again with a guarantee of no concurrent HandleOk/HandleExn calls at the same time)
Many of the dotnet-templates have examples, and/or one can look at the Propulsion internal ones for slightly more exotic schemes.
A struct DU is typically a good fit for said Outcome
A secondary function of this is to enable a clear way for Hander outcomes to be emitted to e.g. Prometheus - at the same time as one is gathering stats for local use, you also map that to a Prometheus metric. Example:
App/HostedReactor.fs
Outdated
batchSize = 1000, | ||
// Controls the time to wait once fully caught up | ||
// before requesting a new batch of events | ||
tailSleepInterval = TimeSpan.FromMilliseconds 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that freq warranted by the use case? (1s might be fine?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just lifted the default value from eventide. In theory it should be tuned to the arrival rate of the category so 100ms is clearly too small
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes; obv 100ms is a decent default in general of course - the poll op should be pretty cheap and it has a very direct effect on typical reactor latency (though I tend not to add defaults for any of these sorts of things in Propulsion as its hard to generalise over all conceivable apps)
App/HostedReactor.fs
Outdated
sink = sink, | ||
// An array of message-db categories to subscribe to | ||
// Propulsion guarantees that events within streams are | ||
// handled in order, it makes no guarantees across streams (Even within categories) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// handled in order, it makes no guarantees across streams (Even within categories) | |
// handled in order, but makes no guarantees across streams (Even within categories) |
App/Program.fs
Outdated
let connection = MessageDbConnector(writeUrl, readUrl).Establish() | ||
let context = MessageDbContext(connection) | ||
let connection = MessageDbClient(writeUrl, readUrl) | ||
let context = MessageDbContext(connection, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
named arg
App/Program.fs
Outdated
let connection = MessageDbConnector(writeUrl, readUrl).Establish() | ||
let context = MessageDbContext(connection) | ||
let connection = MessageDbClient(writeUrl, readUrl) | ||
let context = MessageDbContext(connection, 100) | ||
let caching = CachingStrategy.SlidingWindow(cache, TimeSpan.FromMinutes(20)) | ||
|
||
let service = | ||
MessageDbCategory(context, Invoice.Events.codec, Invoice.Fold.fold, Invoice.Fold.initial, caching) | ||
|> Equinox.Decider.resolve log | ||
|> Invoice.create |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe shift this into an InvoiceService.create (context, cache)
that encapsulates creating the CachingStrategy, MessageDbCategory and the Decider.resolve ? (Same for numbering service)
(I know it seems clear and terse here, but the fact you are connecting 2 services to a single context will stand out better)
App/Program.fs
Outdated
do! service.RecordPayment(id, payment) | ||
return "OK" | ||
} | ||
let reactorService = InvoiceNumberingReactor.Service(numberService, service) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arguably this could become InvoiceNumberingReactor.create (context, cache)
, and then have that create the two relevant Domain Services
(that then also helps with exposing such helpers for test scenarios)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm on the fence. That'll couple the InvoiceNumberingReactor to MessageDb. One of my goals here is to only use store specific things in App
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see type Domain.Config.Store
in proHotel
My convention is to always have wiring for memory and >= 1 concrete store as a DU
Then you can look at top of an aggregate module to see where it lives and bottom of it to see the access strategy
The obvious debatable bit is that this means Domain.fsproj has a PackageRef to MemoryStore and ConcreteStore
This is also slightly coloured by the fact that dotnet-templates is in the business of providing store switching
One fringe benefit is that this means one can mix and match and/or migrate a given Category to another store easily by adding a DU case and fixing the errors (which I did, from Cosmos to Dynamo, one time!).
I do agree that stuff gets noisier if you consider more than one store target, especially for a simple sample.
Over to you to do as you see fit!
App/Program.fs
Outdated
return "OK" | ||
} | ||
let reactorService = InvoiceNumberingReactor.Service(numberService, service) | ||
let checkpointService = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to bottom of the aggregate module given app refs Equinox.MessageDb?
App/Program.fs
Outdated
let readInvoice id = | ||
task { return! service.ReadInvoice(id) } | ||
let builder = WebApplication.CreateBuilder() | ||
builder.Services.AddHostedService(fun _ -> new HostedReactor.HostedReactor(readUrl, log, reactorService, checkpointService)) |> ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe pass relevant inputs to a hostedreactor.create, and let it create the ReactorService ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up adding a Services
type to encapsulate the creation of these services. I think that's a lot clearer, thanks for the suggestion
do! | ||
ids | ||
|> Seq.map (fun id -> invoiceService.Raise(id, { Payer = "1"; Amount = 33m })) | ||
|> Async.Sequential |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for he would not use the for
ce....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂 In my defence, this started out as a Parallel and the MemoryStore choked for some reason (that reason was unrelated to the parallel) 🙈
Adds a simplistic decider that holds all invoice numbers and which invoices they belong to. A reactor that listens to "invoice raised" and in response will fetch the next number and apply it to the invoice