Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

class SequentSpecifyDatabaseInsertionOrder < ActiveRecord::Migration[7.2]
def up
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_aggregates', version: 3
execute_sql_file 'store_events', version: 3
execute_sql_file 'update_unique_keys', version: 2
end
end

def down
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_aggregates', version: 2
execute_sql_file 'store_events', version: 2
execute_sql_file 'update_unique_keys', version: 1
end
end

private

def execute_sql_file(filename, version:)
say "Applying '#{filename}' version #{version}", true
suppress_messages do
execute File.read(
File.join(
File.dirname(__FILE__),
format('sequent/%s_v%02d.sql', filename, version),
),
)
end
end
end
45 changes: 45 additions & 0 deletions db/migrate/sequent/store_aggregates_v03.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
CREATE OR REPLACE PROCEDURE store_aggregates(_aggregates_with_events jsonb)
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
DECLARE
_aggregate jsonb;
_events jsonb;
_aggregate_id aggregates.aggregate_id%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_current_partition_key aggregates.events_partition_key%TYPE;
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
BEGIN
FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row ORDER BY row->0->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';

_events_partition_key = COALESCE(_aggregate->>'events_partition_key', '');
INSERT INTO aggregates (aggregate_id, created_at, aggregate_type_id, events_partition_key)
VALUES (
_aggregate_id,
(_events->0->>'created_at')::timestamptz,
(SELECT id FROM aggregate_types WHERE type = _aggregate->>'aggregate_type'),
_events_partition_key
) ON CONFLICT (aggregate_id) DO NOTHING;

_current_partition_key = (SELECT events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id);
IF _current_partition_key <> _events_partition_key THEN
INSERT INTO partition_key_changes AS row (aggregate_id, old_partition_key, new_partition_key)
VALUES (_aggregate_id, _current_partition_key, _events_partition_key)
ON CONFLICT (aggregate_id)
DO UPDATE SET new_partition_key = EXCLUDED.new_partition_key,
updated_at = NOW()
WHERE row.new_partition_key IS DISTINCT FROM EXCLUDED.new_partition_key;
ELSE
DELETE FROM partition_key_changes WHERE aggregate_id = _aggregate_id;
END IF;

_snapshot_outdated_at = _aggregate->>'snapshot_outdated_at';
IF _snapshot_outdated_at IS NOT NULL THEN
INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at)
VALUES (_aggregate_id, _snapshot_outdated_at)
ON CONFLICT (aggregate_id) DO UPDATE
SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at)
WHERE row.snapshot_outdated_at IS DISTINCT FROM EXCLUDED.snapshot_outdated_at;
END IF;
END LOOP;
END;
$$;
64 changes: 64 additions & 0 deletions db/migrate/sequent/store_events_v03.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb)
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
DECLARE
_command_id commands.id%TYPE;
_aggregates jsonb;
_aggregate jsonb;
_events jsonb;
_aggregate_id aggregates.aggregate_id%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_last_sequence_number events.sequence_number%TYPE;
_next_sequence_number events.sequence_number%TYPE;
BEGIN
CALL update_types(_command, _aggregates_with_events);

_command_id = store_command(_command);

CALL store_aggregates(_aggregates_with_events);

FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row
ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number'
LOOP
_aggregate_id = _aggregate->>'aggregate_id';
SELECT events_partition_key INTO STRICT _events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id;

SELECT sequence_number
INTO _last_sequence_number
FROM events
WHERE partition_key = _events_partition_key
AND aggregate_id = _aggregate_id
ORDER BY 1 DESC
LIMIT 1;

SELECT MIN(event->'event_json'->>'sequence_number')
INTO _next_sequence_number
FROM jsonb_array_elements(_events) AS event;

-- Check sequence number of first new event to ensure optimistic locking works correctly
-- (otherwise two concurrent transactions could insert events with different first/next
-- sequence number and no constraint violation would be raised).
IF _last_sequence_number IS NULL AND _next_sequence_number <> 1 THEN
RAISE EXCEPTION 'sequence_number of first event must be 1, but was % (aggregate %)', _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
ELSIF _last_sequence_number IS NOT NULL AND _next_sequence_number > _last_sequence_number + 1 THEN
RAISE EXCEPTION 'sequence_number must be consecutive, but last sequence number was % and next is % (aggregate %)',
_last_sequence_number, _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
END IF;

INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json)
SELECT _events_partition_key,
_aggregate_id,
(event->'event_json'->'sequence_number')::integer,
(event->>'created_at')::timestamptz,
_command_id,
(SELECT id FROM event_types WHERE type = event->>'event_type'),
(event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[]
FROM jsonb_array_elements(_events) AS event
ORDER BY 1, 2, 3;
END LOOP;

_aggregates = (SELECT jsonb_agg(row->0) FROM jsonb_array_elements(_aggregates_with_events) AS row);
CALL update_unique_keys(_aggregates);
END;
$$;
34 changes: 34 additions & 0 deletions db/migrate/sequent/update_unique_keys_v02.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE OR REPLACE PROCEDURE update_unique_keys(_stream_records jsonb)
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
DECLARE
_aggregate jsonb;
_aggregate_id aggregates.aggregate_id%TYPE;
_unique_keys jsonb;
BEGIN
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

DELETE FROM aggregate_unique_keys AS target
WHERE target.aggregate_id = _aggregate_id
AND NOT (_unique_keys ? target.scope);
END LOOP;

FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key)
SELECT _aggregate_id, key, value
FROM jsonb_each(_unique_keys) AS x
ORDER BY 1, 2
ON CONFLICT (aggregate_id, scope) DO UPDATE
SET key = EXCLUDED.key
WHERE target.key <> EXCLUDED.key;
END LOOP;
EXCEPTION
WHEN unique_violation THEN
RAISE unique_violation
USING MESSAGE = 'duplicate unique key value for aggregate ' || (_aggregate->>'aggregate_type') || ' ' || _aggregate_id || ' (' || SQLERRM || ')';
END;
$$;
37 changes: 33 additions & 4 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ DECLARE
_current_partition_key aggregates.events_partition_key%TYPE;
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
BEGIN
FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP
FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row ORDER BY row->0->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';

_events_partition_key = COALESCE(_aggregate->>'events_partition_key', '');
Expand Down Expand Up @@ -520,6 +520,8 @@ DECLARE
_events jsonb;
_aggregate_id aggregates.aggregate_id%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_last_sequence_number events.sequence_number%TYPE;
_next_sequence_number events.sequence_number%TYPE;
BEGIN
CALL update_types(_command, _aggregates_with_events);

Expand All @@ -533,6 +535,30 @@ BEGIN
_aggregate_id = _aggregate->>'aggregate_id';
SELECT events_partition_key INTO STRICT _events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id;

SELECT sequence_number
INTO _last_sequence_number
FROM events
WHERE partition_key = _events_partition_key
AND aggregate_id = _aggregate_id
ORDER BY 1 DESC
LIMIT 1;

SELECT MIN(event->'event_json'->>'sequence_number')
INTO _next_sequence_number
FROM jsonb_array_elements(_events) AS event;

-- Check sequence number of first new event to ensure optimistic locking works correctly
-- (otherwise two concurrent transactions could insert events with different first/next
-- sequence number and no constraint violation would be raised).
IF _last_sequence_number IS NULL AND _next_sequence_number <> 1 THEN
RAISE EXCEPTION 'sequence_number of first event must be 1, but was % (aggregate %)', _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
ELSIF _last_sequence_number IS NOT NULL AND _next_sequence_number > _last_sequence_number + 1 THEN
RAISE EXCEPTION 'sequence_number must be consecutive, but last sequence number was % and next is % (aggregate %)',
_last_sequence_number, _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
END IF;

INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json)
SELECT _events_partition_key,
_aggregate_id,
Expand All @@ -541,7 +567,8 @@ BEGIN
_command_id,
(SELECT id FROM event_types WHERE type = event->>'event_type'),
(event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[]
FROM jsonb_array_elements(_events) AS event;
FROM jsonb_array_elements(_events) AS event
ORDER BY 1, 2, 3;
END LOOP;

_aggregates = (SELECT jsonb_agg(row->0) FROM jsonb_array_elements(_aggregates_with_events) AS row);
Expand Down Expand Up @@ -641,7 +668,7 @@ DECLARE
_aggregate_id aggregates.aggregate_id%TYPE;
_unique_keys jsonb;
BEGIN
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate LOOP
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

Expand All @@ -650,13 +677,14 @@ BEGIN
AND NOT (_unique_keys ? target.scope);
END LOOP;

FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate LOOP
FOR _aggregate IN SELECT aggregate FROM jsonb_array_elements(_stream_records) AS aggregate ORDER BY aggregate->>'aggregate_id' LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key)
SELECT _aggregate_id, key, value
FROM jsonb_each(_unique_keys) AS x
ORDER BY 1, 2
ON CONFLICT (aggregate_id, scope) DO UPDATE
SET key = EXCLUDED.key
WHERE target.key <> EXCLUDED.key;
Expand Down Expand Up @@ -1432,6 +1460,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records
SET search_path TO public,view_schema,sequent_schema;

INSERT INTO "schema_migrations" (version) VALUES
('20250512135500'),
('20250509120000'),
('20250501120000'),
('20250312105100'),
Expand Down
53 changes: 53 additions & 0 deletions spec/lib/sequent/core/event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,59 @@ class MyAggregate < Sequent::Core::AggregateRoot

expect(event_store.load_events_since_marked_position(updated_mark)[0]).to be_empty
end

it 'fails if the first event does not have sequence number 1' do
expect do
event_store.commit_events(
Sequent::Core::Command.new(aggregate_id:),
[
[
Sequent::Core::EventStream.new(
aggregate_type: 'MyAggregate',
aggregate_id: Sequent.new_uuid,
),
[
MyEvent.new(
aggregate_id:,
sequence_number: 2,
created_at: Time.parse('2024-02-29T02:10:12Z'),
data: "another event\n",
),
],
],
],
)
end.to raise_error(ActiveRecord::StatementInvalid)
end

it 'fails if the next event has a sequence number gap with the last event' do
stream = Sequent::Core::EventStream.new(
aggregate_type: 'MyAggregate',
aggregate_id: Sequent.new_uuid,
)
event = MyEvent.new(
aggregate_id:,
sequence_number: 1,
created_at: Time.parse('2024-02-29T02:10:12Z'),
data: "another event\n",
)

event_store.commit_events(
Sequent::Core::Command.new(aggregate_id:),
[
[stream, [event]],
],
)

expect do
event_store.commit_events(
Sequent::Core::Command.new(aggregate_id:),
[
[stream, [event.copy(sequence_number: 3)]],
],
)
end.to raise_error(ActiveRecord::StatementInvalid)
end
end

describe '#permanently_delete_events' do
Expand Down