Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a reactor to reserve and dole out invoice numbers #1

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
obj/
bin/

*.DotSettings.user
13 changes: 13 additions & 0 deletions .idea/.idea.eqx-blog/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/.idea.eqx-blog/.idea/dataSources.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/.idea.eqx-blog/.idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/.idea.eqx-blog/.idea/indexLayout.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/.idea.eqx-blog/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions App/App.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<RootNamespace>eqx_blog</RootNamespace>
</PropertyGroup>

<ItemGroup>
<Compile Include="CheckpointStore.fs" />
<Compile Include="InvoiceNumberReactor.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="4.0.0-rc.7" />
<PackageReference Include="Equinox.MessageDb" Version="4.0.0-rc.7" />
<PackageReference Include="FsCodec" Version="3.0.0-rc.9" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.9" />
<PackageReference Include="FSharp.UMX" Version="1.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="Propulsion" Version="3.0.0-beta.6" />
<PackageReference Include="Propulsion.MessageDb" Version="3.0.0-beta.5.1" />
<PackageReference Include="Serilog" Version="2.12.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="TypeShape" Version="10.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\InvoiceNumbering\InvoiceNumbering.fsproj" />
<ProjectReference Include="..\Invoice\Invoice.fsproj" />
</ItemGroup>

</Project>
58 changes: 58 additions & 0 deletions App/CheckpointStore.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module CheckpointStore

open Equinox
open Propulsion.Feed

module Events =
type Event =
| Checkpoint of {| pos: int64 |}
interface TypeShape.UnionContract.IUnionContract

let codec = FsCodec.SystemTextJson.Codec.Create<Event>()

module Fold =
type State = int64 option
let initial = None

let evolve _ =
function
| Events.Checkpoint x -> Some x.pos

let fold: State -> Events.Event seq -> State = Seq.fold evolve

let streamId = StreamId.gen2 SourceId.toString id
type CheckpointService internal (resolve: string -> StreamId -> Equinox.Decider<Events.Event, Fold.State>) =
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't in the mood to add another PG user and schema for the checkpoints. This just checkpoints into an adjacent stream.

e.g. if you're group MyGroup listening to the category MyCategory, it'll checkpoint to MyCategory:position-messageDb_MyGroup. The messageDb comes from propulsion and is to allow for multiple sources.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might upstream this to propulsion

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not be surprised if @bartelink had already made a storage agnostic decider like this that I'm just not aware of

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The document-based stores (which can do rollingstate updates) do this https://github.com/jet/propulsion/blob/master/src/Propulsion.CosmosStore/ReaderCheckpoint.fs
But an equivalent for PG is not currently implemented (might not be the daftest idea!)
For now I think using https://github.com/jet/propulsion/blob/master/src/Propulsion.MessageDb/ReaderCheckpoint.fs#L49 is simply the right thing to do, even if it makes some mess ?

Copy link

@bartelink bartelink Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrt upstreaming, having an ever-growing stream would be a blocker

Also while in general it can be helpful to keep checkpoints in the same place as the info being derived (and that's the case here), another consideration is that you want to avoid the feedback effect of writing a checkpoint position triggering a (null) reaction (and checkpoint update every 5s) each time

For messagedb, if you keep Invoice, InvoiceNumber and the checkpoints in the same DB, [as events] this would be the case

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think using MessageDb.ReaderCheckpoint is simply the right thing to do

It's definitely not a bad idea. But as always, there are trade-offs.

  • Needs a separate connection string (checkpoints are stored in a different schema and the message_store role only has access to the message_store).
  • Can lead to unpredictable performance because PG needs to VACUUM the table every now and then

I would definitely default to the checkpoints table, but I think there's value in having the option to "just use messagedb"

another consideration is that you want to avoid the feedback effect of writing a checkpoint position triggering a (null) reaction (and checkpoint update every 5s) each time

I don't think this applies to MessageDB as we keep the position on a per category basis. There's no feedback because the checkpoints go to a separate category.

having an ever-growing stream would be a blocker

The position of message-db's authors (which I agree with) is that message db should not be in the business of deleting data. Any production deployment of message-db will include pruning procedures outside of the application. With that in mind I don't think it should be a blocker as it's entirely consistent with real-world usage of MessageDB.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that in mind I don't think it should be a blocker as it's entirely consistent with real-world usage of MessageDB.

Not suggesting there should be built-in pruning.

I don't think this applies to MessageDB as we keep the position on a per category basis. There's no feedback because the checkpoints go to a separate category.

If the position is the event index within the category and not affected by writes to other categories, I guess that will dramatically reduce the amount of checkpointing (but e.g. if you wanted to store ESDB checkpoints in PG, then an event every 5s is going to add up, and inevitably someone will get paged at 5am and get annoyed with @nordfjord when they git blame!)

I would definitely default to the checkpoints table, but I think there's value in having the option to "just use messagedb"

Maybe for the sample have the checkpoints DB connstr be optional iff people opt into "unbounded stream" checkpoints ?

IOW if they run with --checkpointStream, they can avoid rigging the checkpoints schema ?

Maybe I'm being a bit strong - In the context of the sample as a whole, the aggregate does serve as a nice example impl. But I do feel it's a step too far to put it in the box unless it's in a Internal namespace with a name that conveys that it will grow forever and you might just want to use the one that keeps one row in the DB ;)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the position is the event index within the category and not affected by writes to other categories, I guess that will dramatically reduce the amount of checkpointing

The position is from the global sequence. The position sequence in the category is therefore monotonic but not atomic (e.g. 1,3,8,12222 is a perfectly valid sequence of positions). Also note that the subscription is polling get_category_messages for each of the categories supplied (so we'll only get evens for the subscribed categories). This is of course different from how the other stores function where the subscription is to an $all stream.

To be clear, when I say "real-world" I mean that this is how eventide does its checkpointing

member _.SetCheckpoint(source, tranche, group: string, pos) =
let category = TrancheId.toString tranche + ":position"
let streamId = streamId (source, group)
let decider = resolve category streamId
decider.Transact (function
| None -> [ Events.Checkpoint {| pos = pos |} ]
| Some curr when curr < pos -> [ Events.Checkpoint {| pos = pos |} ]
| Some _ -> [])

member _.ReadCheckpoint(source, tranche, group) =
let category = TrancheId.toString tranche + ":position"
let streamId = StreamId.ofRaw $"{SourceId.toString source}_{group}"
let decider = resolve category streamId
decider.Query(id)

let create resolve = CheckpointService(resolve)

type CheckpointStore(service: CheckpointService, consumerGroup, defaultCheckpointFrequency) =
interface IFeedCheckpointStore with
member this.Commit(source, tranche, pos) =
service.SetCheckpoint(source, tranche, consumerGroup, Position.toInt64 pos)

member this.Start(source, tranche, establishOrigin) =
async {
let! maybePos = service.ReadCheckpoint(source, tranche, consumerGroup)

let! pos =
match maybePos, establishOrigin with
| Some pos, _ -> async { return Position.parse pos }
| None, Some f -> f
| None, None -> async { return Position.initial }

return struct (defaultCheckpointFrequency, pos)
}
45 changes: 45 additions & 0 deletions App/InvoiceNumberReactor.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module InvoiceNumberReactor

open System
open Microsoft.Extensions.Hosting
open Propulsion.Internal
open Propulsion.MessageDb

type HostedService internal (connectionString, log, service: InvoiceNumberingReactor.Service, checkpointService: CheckpointStore.CheckpointService) =

inherit BackgroundService()

[<Literal>]
let GroupName = "InvoiceNumberingReactor"

let categories = [| Invoice.Category |]

let checkpoints = CheckpointStore.CheckpointStore(checkpointService, GroupName, TimeSpan.FromSeconds 10)

override _.ExecuteAsync(ct) =
let computation =
async {
use sink = service.Sink(log)

use src =
MessageDbSource(
log,
statsInterval = TimeSpan.FromMinutes 1,
connectionString = connectionString,
batchSize = 1000,
// Controls the time to wait once fully caught up
// before requesting a new batch of events
tailSleepInterval = TimeSpan.FromMilliseconds 100,
checkpoints = checkpoints,
sink = sink,
// An array of message-db categories to subscribe to
// Propulsion guarantees that events within streams are
// handled in order, it makes no guarantees across streams (Even within categories)
categories = categories
)
.Start()

do! src.AwaitWithStopOnCancellation()
}

Async.StartAsTask(computation, cancellationToken = ct)
87 changes: 57 additions & 30 deletions App/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Program
open System
open Microsoft.AspNetCore.Builder
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.DependencyInjection
open Equinox.MessageDb
open Serilog

Expand All @@ -22,46 +23,72 @@ let writeUrl =
let readUrl =
Environment.tryGetEnv "MESSAGE_DB_REPLICA_URL" |> Option.defaultValue writeUrl

let connection = MessageDbConnector(writeUrl, readUrl).Establish()
let connection = MessageDbClient(writeUrl, readUrl)
let context = MessageDbContext(connection)
let caching = CachingStrategy.SlidingWindow(cache, TimeSpan.FromMinutes(20))

let service =
MessageDbCategory(context, Invoice.Events.codec, Invoice.Fold.fold, Invoice.Fold.initial, caching)
|> Equinox.Decider.resolve log
|> Invoice.create
type Services() =
static member Create(context, codec, fold, initial, ?access) =
MessageDbCategory(context, codec, fold, initial, caching, ?access = access)

static member InvoiceService(context) =
let codec = Invoice.Events.codec
let fold, initial = Invoice.Fold.fold, Invoice.Fold.initial
Services.Create(context, codec, fold, initial)
|> Equinox.Decider.resolve log
|> Invoice.create

static member InvoiceNumberService(context) =
let codec = InvoiceNumbering.Events.codec
let fold, initial = InvoiceNumbering.Fold.fold, InvoiceNumbering.Fold.initial
let snapshot = InvoiceNumbering.Fold.snapshotEventType, InvoiceNumbering.Fold.toSnapshot
let access = AccessStrategy.AdjacentSnapshots snapshot
Services.Create(context, codec, fold, initial, access)
|> Equinox.Decider.resolve log
|> InvoiceNumbering.create

static member InvoiceNumberReactorService(context) =
let invoiceService = Services.InvoiceService(context)
let numberService = Services.InvoiceNumberService(context)
InvoiceNumberingReactor.Service(numberService, invoiceService)

static member CheckpointService(context) =
let codec = CheckpointStore.Events.codec
let fold, initial = CheckpointStore.Fold.fold, CheckpointStore.Fold.initial
let access = AccessStrategy.LatestKnownEvent
Services.Create(context, codec, fold, initial, access)
|> Equinox.Decider.resolve log
|> CheckpointStore.create

static member HostedReactor(context) =
let reactorService = Services.InvoiceNumberReactorService(context)
let checkpointService = Services.CheckpointService(context)
new InvoiceNumberReactor.HostedService(readUrl, log, reactorService, checkpointService)

let builder = WebApplication.CreateBuilder()
builder.Services.AddHostedService(fun _ -> Services.HostedReactor(context)) |> ignore
let app = builder.Build()

let raiseInvoice body =
task {
let id = Guid.NewGuid() |> Invoice.InvoiceId.ofGuid
do! service.Raise(id, body)
return id
}
let service = Services.InvoiceService(context)

app.MapPost("/", Func<_, _>(raiseInvoice)) |> ignore
app.MapPost("/", Func<_, _>(fun body -> task {
let id = Guid.NewGuid() |> Invoice.InvoiceId.ofGuid
do! service.Raise(id, body)
return id
})) |> ignore

let finalizeInvoice id =
task {
do! service.Finalize(id)
return "OK"
}
app.MapPost("/{id}/finalize", Func<_, _>(fun id -> task {
do! service.Finalize(id)
return "OK"
})) |> ignore

app.MapPost("/{id}/finalize", Func<_, _>(finalizeInvoice)) |> ignore
app.MapPost("/{id}/record-payment", Func<_, _, _>(fun id payment -> task {
do! service.RecordPayment(id, payment)
return "OK"
})) |> ignore

let recordPayment id payment =
task {
do! service.RecordPayment(id, payment)
return "OK"
}

app.MapPost("/{id}/record-payment", Func<_, _, _>(recordPayment)) |> ignore

let readInvoice id =
task { return! service.ReadInvoice(id) }

app.MapGet("/{id}", Func<_, _>(readInvoice)) |> ignore
app.MapGet("/{id}", Func<_, _>(fun id -> task {
return! service.ReadInvoice(id)
})) |> ignore

app.Run()
50 changes: 13 additions & 37 deletions App/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -1,37 +1,13 @@
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:52842",
"sslPort": 44350
}
},
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "http://localhost:5244",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "https://localhost:7026;http://localhost:5244",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
{
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5244",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
24 changes: 0 additions & 24 deletions App/eqx-blog.fsproj

This file was deleted.

Loading