Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

class SequentOverwriteSnapshotOnConflict < ActiveRecord::Migration[7.2]
def up
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_snapshots', version: 3
end
end

def down
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_snapshots', version: 2
end
end

private

def execute_sql_file(filename, version:)
say "Applying '#{filename}' version #{version}", true
suppress_messages do
execute File.read(
File.join(
File.dirname(__FILE__),
format('sequent/%s_v%02d.sql', filename, version),
),
)
end
end
end
37 changes: 37 additions & 0 deletions db/migrate/sequent/store_snapshots_v03.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
CREATE OR REPLACE PROCEDURE store_snapshots(_snapshots jsonb)
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
DECLARE
_aggregate_id uuid;
_snapshot jsonb;
_snapshot_version integer;
_sequence_number snapshot_records.sequence_number%TYPE;
BEGIN
FOR _snapshot IN SELECT * FROM jsonb_array_elements(_snapshots) LOOP
_aggregate_id = _snapshot->>'aggregate_id';
_sequence_number = _snapshot->'sequence_number';
_snapshot_version = _snapshot->'snapshot_version';

INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_version, snapshot_sequence_number_high_water_mark)
VALUES (_aggregate_id, _snapshot_version, _sequence_number)
ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE
SET snapshot_sequence_number_high_water_mark =
GREATEST(row.snapshot_sequence_number_high_water_mark, EXCLUDED.snapshot_sequence_number_high_water_mark),
snapshot_outdated_at = NULL,
snapshot_scheduled_at = NULL;

INSERT INTO snapshot_records (aggregate_id, snapshot_version, sequence_number, created_at, snapshot_type, snapshot_json)
VALUES (
_aggregate_id,
_snapshot_version,
_sequence_number,
(_snapshot->>'created_at')::timestamptz,
_snapshot->>'snapshot_type',
_snapshot->'snapshot_json'
) ON CONFLICT (aggregate_id, snapshot_version, sequence_number) DO UPDATE
SET created_at = EXCLUDED.created_at,
snapshot_type = EXCLUDED.snapshot_type,
snapshot_json = EXCLUDED.snapshot_json
WHERE snapshot_records.created_at < EXCLUDED.created_at;
END LOOP;
END;
$$;
7 changes: 6 additions & 1 deletion db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,11 @@ BEGIN
(_snapshot->>'created_at')::timestamptz,
_snapshot->>'snapshot_type',
_snapshot->'snapshot_json'
);
) ON CONFLICT (aggregate_id, snapshot_version, sequence_number) DO UPDATE
SET created_at = EXCLUDED.created_at,
snapshot_type = EXCLUDED.snapshot_type,
snapshot_json = EXCLUDED.snapshot_json
WHERE snapshot_records.created_at <= EXCLUDED.created_at;
END LOOP;
END;
$$;
Expand Down Expand Up @@ -1572,6 +1576,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records
SET search_path TO public,view_schema,sequent_schema;

INSERT INTO "schema_migrations" (version) VALUES
('20251211080900'),
('20250815103000'),
('20250630113000'),
('20250601120000'),
Expand Down
3 changes: 2 additions & 1 deletion lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class Configuration
:aggregate_snapshot_versions,
:enable_projector_states,
:projectors_replayer_after_prepare_hook,
:projectors_replayer_after_activate_hook
:projectors_replayer_after_activate_hook,
:aggregates_that_need_snapshots_loaded_callback

attr_reader :migrations_class,
:versions_table_name
Expand Down
17 changes: 12 additions & 5 deletions lib/sequent/core/aggregate_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ def load_aggregates(aggregate_ids, clazz = nil)
result = aggregates.values_at(*unique_ids).compact
query_ids = unique_ids - result.map(&:id)

result += Sequent.configuration.event_store.load_events_for_aggregates(query_ids).map do |stream, events|
loaded_aggregates = event_store.load_events_for_aggregates(query_ids).map do |stream, events|
aggregate_class = Class.const_get(stream.aggregate_type)
aggregate_class.load_from_history(stream, events)
end

aggregates_that_need_snapshots = loaded_aggregates.select(&:snapshot_outdated?)
if aggregates_that_need_snapshots.present?
Sequent.configuration.aggregates_that_need_snapshots_loaded_callback&.call(aggregates_that_need_snapshots)
end

result += loaded_aggregates
if result.count != unique_ids.count
missing_aggregate_ids = unique_ids - result.map(&:id)
fail AggregateNotFound, missing_aggregate_ids
Expand All @@ -131,15 +137,14 @@ def load_aggregates(aggregate_ids, clazz = nil)
end

def find_aggregate_by_unique_key(scope, key, clazz = nil)
aggregate_id = Sequent.configuration.event_store.find_aggregate_id_by_unique_key(scope, key)
aggregate_id = event_store.find_aggregate_id_by_unique_key(scope, key)
load_aggregate(aggregate_id, clazz) if aggregate_id
end

##
# Returns whether the event store has an aggregate with the given id
def contains_aggregate?(aggregate_id)
Sequent.configuration.event_store.stream_exists?(aggregate_id) &&
Sequent.configuration.event_store.events_exists?(aggregate_id)
event_store.stream_exists?(aggregate_id) && event_store.events_exists?(aggregate_id)
end

# Gets all uncommitted_events from the 'registered' aggregates
Expand Down Expand Up @@ -186,8 +191,10 @@ def aggregates
end

def store_events(command, streams_with_events)
Sequent.configuration.event_store.commit_events(command, streams_with_events)
event_store.commit_events(command, streams_with_events)
end

def event_store = Sequent.configuration.event_store
end
end
end
7 changes: 3 additions & 4 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ def commit_events(command, streams_with_events)
# @param &block Block that should be passed to handle the batches returned from this method
def stream_events_for_aggregate(aggregate_id, load_until: nil, &block)
stream = find_event_stream(aggregate_id)
fail ArgumentError, 'no stream found for this aggregate' if stream.blank?

has_events = false
fail ArgumentError, "no stream found for this aggregate #{aggregate_id}" if stream.blank?

# PostgreSQLCursor::Cursor does not support bind parameters, so bind parameters manually instead.
sql = ActiveRecord::Base.sanitize_sql_array(
Expand All @@ -118,12 +116,13 @@ def stream_events_for_aggregate(aggregate_id, load_until: nil, &block)
],
)

has_events = false
PostgreSQLCursor::Cursor.new(sql, {connection: connection}).each_row do |event_hash|
has_events = true
event = deserialize_event(event_hash)
block.call([stream, event])
end
fail ArgumentError, 'no events for this aggregate' unless has_events
fail ArgumentError, "no events for aggregate #{aggregate_id}" unless has_events
end

def load_event(aggregate_id, sequence_number)
Expand Down
45 changes: 43 additions & 2 deletions spec/lib/sequent/core/aggregate_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ def load_from_history(stream, events)
end

before do
Sequent.configuration.event_store = event_store
Sequent.configure do |c|
c.event_store = event_store
c.aggregates_that_need_snapshots_loaded_callback = aggregates_that_need_snapshots_loaded_callback
end
repository.clear
end
after do
repository.clear
end

let(:event_store) { double }
let(:aggregates_that_need_snapshots_loaded_callback) { instance_spy(Proc, 'snapshot notification callback') }
let(:event_store) { instance_double(Sequent::Core::EventStore, 'event store') }
let(:repository) { Sequent.configuration.aggregate_repository }
let(:aggregate) { DummyAggregate.new(Sequent.new_uuid) }
let(:events) do
Expand Down Expand Up @@ -104,6 +108,43 @@ def load_from_history(stream, events)
expect { repository.load_aggregate(:id, DummyAggregate2) }.to raise_error TypeError
end

context 'loading aggregates' do
class MyAggregateThatNeedsSnapshot < DummyAggregate
def snapshot_outdated? = true
end

before do
allow(event_store).to receive(:load_events_for_aggregates).with([:id]).and_return(
[
[
aggregate.event_stream,
events,
],
],
)
end

context 'aggregate that does not need a snapshot' do
it 'should not notify when the aggregate is loaded' do
loaded = repository.load_aggregate(:id)
expect(loaded.class).to eq(DummyAggregate)
expect(loaded.snapshot_outdated?).to be false
expect(aggregates_that_need_snapshots_loaded_callback).to_not have_received(:call)
end
end

context 'aggregate that needs a snapshot' do
let(:aggregate) { MyAggregateThatNeedsSnapshot.new(Sequent.new_uuid) }

it 'should notify when an aggregate is loaded that needs a snapshot' do
loaded = repository.load_aggregate(:id)
expect(aggregates_that_need_snapshots_loaded_callback).to have_received(:call).with([loaded])
expect(loaded.class).to eq(MyAggregateThatNeedsSnapshot)
expect(loaded.snapshot_outdated?).to be true
end
end
end

it 'should commit and clear events from aggregates in the identity map' do
repository.add_aggregate aggregate
aggregate.uncommitted_events = [:event]
Expand Down
23 changes: 22 additions & 1 deletion spec/lib/sequent/core/event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,27 @@ class MyAggregate < Sequent::Core::AggregateRoot
)
end

it 'overwrites existing snapshot with newer version' do
event_store.store_snapshots([snapshot])

snapshot.created_at += 1

event_store.store_snapshots([snapshot])

expect(event_store.load_latest_snapshot(aggregate_id).created_at).to eq(snapshot.created_at)
end

it 'keeps existing snapshot when storing older snapshot' do
event_store.store_snapshots([snapshot])

original_created_at = snapshot.created_at
snapshot.created_at -= 1

event_store.store_snapshots([snapshot])

expect(event_store.load_latest_snapshot(aggregate_id).created_at).to eq(original_created_at)
end

it 'can delete all snapshots' do
event_store.store_snapshots([snapshot])

Expand Down Expand Up @@ -846,7 +867,7 @@ def stop
it 'argument error for no events' do
expect do |block|
event_store.stream_events_for_aggregate(aggregate_id_1, load_until: frozen_time - 1.year, &block)
end.to raise_error(ArgumentError, 'no events for this aggregate')
end.to raise_error(ArgumentError, "no events for aggregate #{aggregate_id_1}")
end
end

Expand Down