-
Notifications
You must be signed in to change notification settings - Fork 58
Specify aggregate and event database insertion order #469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
erikrozendaal
merged 4 commits into
master
from
specify-aggregate-and-event-insertion-order
May 12, 2025
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
bd78656
Specify aggregate and event database insertion order
erikrozendaal 87c086b
Include sequence number in ordering
erikrozendaal 99541dd
Test and fix sequence number constraint checking
erikrozendaal c95651f
Use `ORDER BY ... DESC LIMIT 1` instead of MAX
erikrozendaal File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
33 changes: 33 additions & 0 deletions
33
db/migrate/20250512135500_sequent_specify_database_insertion_order.rb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| class SequentSpecifyDatabaseInsertionOrder < ActiveRecord::Migration[7.2] | ||
| def up | ||
| Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do | ||
| execute_sql_file 'store_aggregates', version: 3 | ||
| execute_sql_file 'store_events', version: 3 | ||
| execute_sql_file 'update_unique_keys', version: 2 | ||
| end | ||
| end | ||
|
|
||
| def down | ||
| Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do | ||
| execute_sql_file 'store_aggregates', version: 2 | ||
| execute_sql_file 'store_events', version: 2 | ||
| execute_sql_file 'update_unique_keys', version: 1 | ||
| end | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def execute_sql_file(filename, version:) | ||
| say "Applying '#{filename}' version #{version}", true | ||
| suppress_messages do | ||
| execute File.read( | ||
| File.join( | ||
| File.dirname(__FILE__), | ||
| format('sequent/%s_v%02d.sql', filename, version), | ||
| ), | ||
| ) | ||
| end | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| CREATE OR REPLACE PROCEDURE store_aggregates(_aggregates_with_events jsonb) | ||
| LANGUAGE plpgsql SET search_path FROM CURRENT AS $$ | ||
| DECLARE | ||
| _aggregate jsonb; | ||
| _events jsonb; | ||
| _aggregate_id aggregates.aggregate_id%TYPE; | ||
| _events_partition_key aggregates.events_partition_key%TYPE; | ||
| _current_partition_key aggregates.events_partition_key%TYPE; | ||
| _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; | ||
| BEGIN | ||
| FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row ORDER BY row->0->>'aggregate_id' LOOP | ||
| _aggregate_id = _aggregate->>'aggregate_id'; | ||
|
|
||
| _events_partition_key = COALESCE(_aggregate->>'events_partition_key', ''); | ||
| INSERT INTO aggregates (aggregate_id, created_at, aggregate_type_id, events_partition_key) | ||
| VALUES ( | ||
| _aggregate_id, | ||
| (_events->0->>'created_at')::timestamptz, | ||
| (SELECT id FROM aggregate_types WHERE type = _aggregate->>'aggregate_type'), | ||
| _events_partition_key | ||
| ) ON CONFLICT (aggregate_id) DO NOTHING; | ||
|
|
||
| _current_partition_key = (SELECT events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id); | ||
| IF _current_partition_key <> _events_partition_key THEN | ||
| INSERT INTO partition_key_changes AS row (aggregate_id, old_partition_key, new_partition_key) | ||
| VALUES (_aggregate_id, _current_partition_key, _events_partition_key) | ||
| ON CONFLICT (aggregate_id) | ||
| DO UPDATE SET new_partition_key = EXCLUDED.new_partition_key, | ||
| updated_at = NOW() | ||
| WHERE row.new_partition_key IS DISTINCT FROM EXCLUDED.new_partition_key; | ||
| ELSE | ||
| DELETE FROM partition_key_changes WHERE aggregate_id = _aggregate_id; | ||
| END IF; | ||
|
|
||
| _snapshot_outdated_at = _aggregate->>'snapshot_outdated_at'; | ||
| IF _snapshot_outdated_at IS NOT NULL THEN | ||
| INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at) | ||
| VALUES (_aggregate_id, _snapshot_outdated_at) | ||
| ON CONFLICT (aggregate_id) DO UPDATE | ||
| SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at) | ||
| WHERE row.snapshot_outdated_at IS DISTINCT FROM EXCLUDED.snapshot_outdated_at; | ||
| END IF; | ||
| END LOOP; | ||
| END; | ||
| $$; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb) | ||
| LANGUAGE plpgsql SET search_path FROM CURRENT AS $$ | ||
| DECLARE | ||
| _command_id commands.id%TYPE; | ||
| _aggregates jsonb; | ||
| _aggregate jsonb; | ||
| _events jsonb; | ||
| _aggregate_id aggregates.aggregate_id%TYPE; | ||
| _events_partition_key aggregates.events_partition_key%TYPE; | ||
| _last_sequence_number events.sequence_number%TYPE; | ||
| _next_sequence_number events.sequence_number%TYPE; | ||
| BEGIN | ||
| CALL update_types(_command, _aggregates_with_events); | ||
|
|
||
| _command_id = store_command(_command); | ||
|
|
||
| CALL store_aggregates(_aggregates_with_events); | ||
|
|
||
| FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row | ||
| ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number' | ||
| LOOP | ||
| _aggregate_id = _aggregate->>'aggregate_id'; | ||
| SELECT events_partition_key INTO STRICT _events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id; | ||
|
|
||
| SELECT MAX(sequence_number) | ||
| INTO _last_sequence_number | ||
| FROM events | ||
| WHERE partition_key = _events_partition_key | ||
| AND aggregate_id = _aggregate_id; | ||
|
|
||
| SELECT MIN(event->>'sequence_number') | ||
| INTO _next_sequence_number | ||
| FROM jsonb_array_elements(_events) AS event; | ||
|
|
||
| -- Check sequence number of first new event to ensure optimistic locking works correctly | ||
| -- (otherwise two concurrent transactions could insert events with different first/next | ||
| -- sequence number and no constraint violation would be raised). | ||
| IF _last_sequence_number IS NULL AND _next_sequence_number <> 1 THEN | ||
| RAISE EXCEPTION 'sequence_number of first event must be 1'; | ||
| ELSIF _last_sequence_number IS NOT NULL AND _next_sequence_number <= _last_sequence_number + 1 THEN | ||
| RAISE EXCEPTION 'sequence_number must be consecutive'; | ||
| END IF; | ||
|
|
||
| INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json) | ||
| SELECT _events_partition_key, | ||
| _aggregate_id, | ||
| (event->'event_json'->'sequence_number')::integer, | ||
| (event->>'created_at')::timestamptz, | ||
| _command_id, | ||
| (SELECT id FROM event_types WHERE type = event->>'event_type'), | ||
| (event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[] | ||
| FROM jsonb_array_elements(_events) AS event | ||
| ORDER BY 1, 2; | ||
| END LOOP; | ||
|
|
||
| _aggregates = (SELECT jsonb_agg(row->0) FROM jsonb_array_elements(_aggregates_with_events) AS row); | ||
| CALL update_unique_keys(_aggregates); | ||
| END; | ||
| $$; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| CREATE OR REPLACE PROCEDURE update_unique_keys(_stream_records jsonb) | ||
| LANGUAGE plpgsql SET search_path FROM CURRENT AS $$ | ||
| DECLARE | ||
| _aggregate jsonb; | ||
| _aggregate_id aggregates.aggregate_id%TYPE; | ||
| _unique_keys jsonb; | ||
| BEGIN | ||
| FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP | ||
| _aggregate_id = _aggregate->>'aggregate_id'; | ||
| _unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb); | ||
|
|
||
| DELETE FROM aggregate_unique_keys AS target | ||
| WHERE target.aggregate_id = _aggregate_id | ||
| AND NOT (_unique_keys ? target.scope); | ||
| END LOOP; | ||
|
|
||
| FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP | ||
| _aggregate_id = _aggregate->>'aggregate_id'; | ||
| _unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb); | ||
|
|
||
| INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key) | ||
| SELECT _aggregate_id, key, value | ||
| FROM jsonb_each(_unique_keys) AS x | ||
| ORDER BY 1, 2 | ||
| ON CONFLICT (aggregate_id, scope) DO UPDATE | ||
| SET key = EXCLUDED.key | ||
| WHERE target.key <> EXCLUDED.key; | ||
| END LOOP; | ||
| EXCEPTION | ||
| WHEN unique_violation THEN | ||
| RAISE unique_violation | ||
| USING MESSAGE = 'duplicate unique key value for aggregate ' || (_aggregate->>'aggregate_type') || ' ' || _aggregate_id || ' (' || SQLERRM || ')'; | ||
| END; | ||
| $$; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.