diff --git a/db/migrate/20251211080900_sequent_overwrite_snapshot_on_conflict.rb b/db/migrate/20251211080900_sequent_overwrite_snapshot_on_conflict.rb new file mode 100644 index 00000000..7417d4b1 --- /dev/null +++ b/db/migrate/20251211080900_sequent_overwrite_snapshot_on_conflict.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class SequentOverwriteSnapshotOnConflict < ActiveRecord::Migration[7.2] + def up + Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do + execute_sql_file 'store_snapshots', version: 3 + end + end + + def down + Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do + execute_sql_file 'store_snapshots', version: 2 + end + end + + private + + def execute_sql_file(filename, version:) + say "Applying '#{filename}' version #{version}", true + suppress_messages do + execute File.read( + File.join( + File.dirname(__FILE__), + format('sequent/%s_v%02d.sql', filename, version), + ), + ) + end + end +end diff --git a/db/migrate/sequent/store_snapshots_v03.sql b/db/migrate/sequent/store_snapshots_v03.sql new file mode 100644 index 00000000..ab915883 --- /dev/null +++ b/db/migrate/sequent/store_snapshots_v03.sql @@ -0,0 +1,37 @@ +CREATE OR REPLACE PROCEDURE store_snapshots(_snapshots jsonb) +LANGUAGE plpgsql SET search_path FROM CURRENT AS $$ +DECLARE + _aggregate_id uuid; + _snapshot jsonb; + _snapshot_version integer; + _sequence_number snapshot_records.sequence_number%TYPE; +BEGIN + FOR _snapshot IN SELECT * FROM jsonb_array_elements(_snapshots) LOOP + _aggregate_id = _snapshot->>'aggregate_id'; + _sequence_number = _snapshot->'sequence_number'; + _snapshot_version = _snapshot->'snapshot_version'; + + INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_version, snapshot_sequence_number_high_water_mark) + VALUES (_aggregate_id, _snapshot_version, _sequence_number) + ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE + SET snapshot_sequence_number_high_water_mark = + GREATEST(row.snapshot_sequence_number_high_water_mark, EXCLUDED.snapshot_sequence_number_high_water_mark), + snapshot_outdated_at = NULL, + snapshot_scheduled_at = NULL; + + INSERT INTO snapshot_records (aggregate_id, snapshot_version, sequence_number, created_at, snapshot_type, snapshot_json) + VALUES ( + _aggregate_id, + _snapshot_version, + _sequence_number, + (_snapshot->>'created_at')::timestamptz, + _snapshot->>'snapshot_type', + _snapshot->'snapshot_json' + ) ON CONFLICT (aggregate_id, snapshot_version, sequence_number) DO UPDATE + SET created_at = EXCLUDED.created_at, + snapshot_type = EXCLUDED.snapshot_type, + snapshot_json = EXCLUDED.snapshot_json + WHERE snapshot_records.created_at < EXCLUDED.created_at; + END LOOP; +END; +$$; diff --git a/db/structure.sql b/db/structure.sql index 5d29a6c2..5cc4b41d 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -638,7 +638,11 @@ BEGIN (_snapshot->>'created_at')::timestamptz, _snapshot->>'snapshot_type', _snapshot->'snapshot_json' - ); + ) ON CONFLICT (aggregate_id, snapshot_version, sequence_number) DO UPDATE + SET created_at = EXCLUDED.created_at, + snapshot_type = EXCLUDED.snapshot_type, + snapshot_json = EXCLUDED.snapshot_json + WHERE snapshot_records.created_at <= EXCLUDED.created_at; END LOOP; END; $$; @@ -1572,6 +1576,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records SET search_path TO public,view_schema,sequent_schema; INSERT INTO "schema_migrations" (version) VALUES +('20251211080900'), ('20250815103000'), ('20250630113000'), ('20250601120000'), diff --git a/lib/sequent/configuration.rb b/lib/sequent/configuration.rb index 44c335d0..cecfd283 100644 --- a/lib/sequent/configuration.rb +++ b/lib/sequent/configuration.rb @@ -77,7 +77,8 @@ class Configuration :aggregate_snapshot_versions, :enable_projector_states, :projectors_replayer_after_prepare_hook, - :projectors_replayer_after_activate_hook + :projectors_replayer_after_activate_hook, + :aggregates_that_need_snapshots_loaded_callback attr_reader :migrations_class, :versions_table_name diff --git a/lib/sequent/core/aggregate_repository.rb b/lib/sequent/core/aggregate_repository.rb index 73b19cab..3e6a1d20 100644 --- a/lib/sequent/core/aggregate_repository.rb +++ b/lib/sequent/core/aggregate_repository.rb @@ -109,11 +109,17 @@ def load_aggregates(aggregate_ids, clazz = nil) result = aggregates.values_at(*unique_ids).compact query_ids = unique_ids - result.map(&:id) - result += Sequent.configuration.event_store.load_events_for_aggregates(query_ids).map do |stream, events| + loaded_aggregates = event_store.load_events_for_aggregates(query_ids).map do |stream, events| aggregate_class = Class.const_get(stream.aggregate_type) aggregate_class.load_from_history(stream, events) end + aggregates_that_need_snapshots = loaded_aggregates.select(&:snapshot_outdated?) + if aggregates_that_need_snapshots.present? + Sequent.configuration.aggregates_that_need_snapshots_loaded_callback&.call(aggregates_that_need_snapshots) + end + + result += loaded_aggregates if result.count != unique_ids.count missing_aggregate_ids = unique_ids - result.map(&:id) fail AggregateNotFound, missing_aggregate_ids @@ -131,15 +137,14 @@ def load_aggregates(aggregate_ids, clazz = nil) end def find_aggregate_by_unique_key(scope, key, clazz = nil) - aggregate_id = Sequent.configuration.event_store.find_aggregate_id_by_unique_key(scope, key) + aggregate_id = event_store.find_aggregate_id_by_unique_key(scope, key) load_aggregate(aggregate_id, clazz) if aggregate_id end ## # Returns whether the event store has an aggregate with the given id def contains_aggregate?(aggregate_id) - Sequent.configuration.event_store.stream_exists?(aggregate_id) && - Sequent.configuration.event_store.events_exists?(aggregate_id) + event_store.stream_exists?(aggregate_id) && event_store.events_exists?(aggregate_id) end # Gets all uncommitted_events from the 'registered' aggregates @@ -186,8 +191,10 @@ def aggregates end def store_events(command, streams_with_events) - Sequent.configuration.event_store.commit_events(command, streams_with_events) + event_store.commit_events(command, streams_with_events) end + + def event_store = Sequent.configuration.event_store end end end diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 705c967a..f8c2bcbb 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -102,9 +102,7 @@ def commit_events(command, streams_with_events) # @param &block Block that should be passed to handle the batches returned from this method def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) stream = find_event_stream(aggregate_id) - fail ArgumentError, 'no stream found for this aggregate' if stream.blank? - - has_events = false + fail ArgumentError, "no stream found for this aggregate #{aggregate_id}" if stream.blank? # PostgreSQLCursor::Cursor does not support bind parameters, so bind parameters manually instead. sql = ActiveRecord::Base.sanitize_sql_array( @@ -118,12 +116,13 @@ def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ], ) + has_events = false PostgreSQLCursor::Cursor.new(sql, {connection: connection}).each_row do |event_hash| has_events = true event = deserialize_event(event_hash) block.call([stream, event]) end - fail ArgumentError, 'no events for this aggregate' unless has_events + fail ArgumentError, "no events for aggregate #{aggregate_id}" unless has_events end def load_event(aggregate_id, sequence_number) diff --git a/spec/lib/sequent/core/aggregate_repository_spec.rb b/spec/lib/sequent/core/aggregate_repository_spec.rb index bf244ab0..1421d94b 100644 --- a/spec/lib/sequent/core/aggregate_repository_spec.rb +++ b/spec/lib/sequent/core/aggregate_repository_spec.rb @@ -29,14 +29,18 @@ def load_from_history(stream, events) end before do - Sequent.configuration.event_store = event_store + Sequent.configure do |c| + c.event_store = event_store + c.aggregates_that_need_snapshots_loaded_callback = aggregates_that_need_snapshots_loaded_callback + end repository.clear end after do repository.clear end - let(:event_store) { double } + let(:aggregates_that_need_snapshots_loaded_callback) { instance_spy(Proc, 'snapshot notification callback') } + let(:event_store) { instance_double(Sequent::Core::EventStore, 'event store') } let(:repository) { Sequent.configuration.aggregate_repository } let(:aggregate) { DummyAggregate.new(Sequent.new_uuid) } let(:events) do @@ -104,6 +108,43 @@ def load_from_history(stream, events) expect { repository.load_aggregate(:id, DummyAggregate2) }.to raise_error TypeError end + context 'loading aggregates' do + class MyAggregateThatNeedsSnapshot < DummyAggregate + def snapshot_outdated? = true + end + + before do + allow(event_store).to receive(:load_events_for_aggregates).with([:id]).and_return( + [ + [ + aggregate.event_stream, + events, + ], + ], + ) + end + + context 'aggregate that does not need a snapshot' do + it 'should not notify when the aggregate is loaded' do + loaded = repository.load_aggregate(:id) + expect(loaded.class).to eq(DummyAggregate) + expect(loaded.snapshot_outdated?).to be false + expect(aggregates_that_need_snapshots_loaded_callback).to_not have_received(:call) + end + end + + context 'aggregate that needs a snapshot' do + let(:aggregate) { MyAggregateThatNeedsSnapshot.new(Sequent.new_uuid) } + + it 'should notify when an aggregate is loaded that needs a snapshot' do + loaded = repository.load_aggregate(:id) + expect(aggregates_that_need_snapshots_loaded_callback).to have_received(:call).with([loaded]) + expect(loaded.class).to eq(MyAggregateThatNeedsSnapshot) + expect(loaded.snapshot_outdated?).to be true + end + end + end + it 'should commit and clear events from aggregates in the identity map' do repository.add_aggregate aggregate aggregate.uncommitted_events = [:event] diff --git a/spec/lib/sequent/core/event_store_spec.rb b/spec/lib/sequent/core/event_store_spec.rb index 1b38d1b0..4be5f339 100644 --- a/spec/lib/sequent/core/event_store_spec.rb +++ b/spec/lib/sequent/core/event_store_spec.rb @@ -196,6 +196,27 @@ class MyAggregate < Sequent::Core::AggregateRoot ) end + it 'overwrites existing snapshot with newer version' do + event_store.store_snapshots([snapshot]) + + snapshot.created_at += 1 + + event_store.store_snapshots([snapshot]) + + expect(event_store.load_latest_snapshot(aggregate_id).created_at).to eq(snapshot.created_at) + end + + it 'keeps existing snapshot when storing older snapshot' do + event_store.store_snapshots([snapshot]) + + original_created_at = snapshot.created_at + snapshot.created_at -= 1 + + event_store.store_snapshots([snapshot]) + + expect(event_store.load_latest_snapshot(aggregate_id).created_at).to eq(original_created_at) + end + it 'can delete all snapshots' do event_store.store_snapshots([snapshot]) @@ -846,7 +867,7 @@ def stop it 'argument error for no events' do expect do |block| event_store.stream_events_for_aggregate(aggregate_id_1, load_until: frozen_time - 1.year, &block) - end.to raise_error(ArgumentError, 'no events for this aggregate') + end.to raise_error(ArgumentError, "no events for aggregate #{aggregate_id_1}") end end