Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions db/migrate/20250430125000_create_projector_states_table.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

class CreateProjectorStatesTable < ActiveRecord::Migration[7.2]
def change
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
create_table :projector_states, id: false, primary_key: :name do |t|
t.primary_key :name, :text, null: false
t.integer :active_version
t.integer :activating_version
t.integer :replaying_version
t.timestamptz :created_at, precision: 6, null: false, default: -> { 'NOW()' }
t.timestamptz :updated_at, precision: 6, null: false, default: -> { 'NOW()' }
end

add_check_constraint :projector_states,
'replaying_version IS NULL OR activating_version IS NULL',
name: 'replaying_conflicts_with_activating'
add_check_constraint :projector_states,
'replaying_version > active_version',
name: 'replaying_newer_then_active'
add_check_constraint :projector_states,
'activating_version > active_version',
name: 'activating_newer_than_active'
end
end
end
26 changes: 26 additions & 0 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,23 @@ CREATE TABLE sequent_schema.partition_key_changes (
);


--
-- Name: projector_states; Type: TABLE; Schema: sequent_schema; Owner: -
--

CREATE TABLE sequent_schema.projector_states (
name text NOT NULL,
active_version integer,
activating_version integer,
replaying_version integer,
created_at timestamp with time zone DEFAULT now() NOT NULL,
updated_at timestamp with time zone DEFAULT now() NOT NULL,
CONSTRAINT activating_newer_than_active CHECK ((activating_version > active_version)),
CONSTRAINT replaying_conflicts_with_activating CHECK (((replaying_version IS NULL) OR (activating_version IS NULL))),
CONSTRAINT replaying_newer_then_active CHECK ((replaying_version > active_version))
);


--
-- Name: saved_event_records; Type: TABLE; Schema: sequent_schema; Owner: -
--
Expand Down Expand Up @@ -1221,6 +1238,14 @@ ALTER TABLE ONLY sequent_schema.partition_key_changes
ADD CONSTRAINT partition_key_changes_pkey PRIMARY KEY (aggregate_id);


--
-- Name: projector_states projector_states_pkey; Type: CONSTRAINT; Schema: sequent_schema; Owner: -
--

ALTER TABLE ONLY sequent_schema.projector_states
ADD CONSTRAINT projector_states_pkey PRIMARY KEY (name);


--
-- Name: saved_event_records saved_event_records_pkey; Type: CONSTRAINT; Schema: sequent_schema; Owner: -
--
Expand Down Expand Up @@ -1495,6 +1520,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20250509133000'),
('20250509120000'),
('20250501120000'),
('20250430125000'),
('20250312105100'),
('20250101000001'),
('20250101000000');
Expand Down
8 changes: 3 additions & 5 deletions docs/docs/concepts/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ As a bare minimum you need:
require './db/migrations'

Sequent.configure do |config|
config.migrations_class_name = 'Migrations'

# sequent >= 6.0.2
config.enable_autoregistration = true

Expand All @@ -40,8 +38,8 @@ Sequent.configure do |config|
end
```

The `migration_class_name` is the name of the class used to define your Migrations. See
the [Migrations](migrations.html) chapter for an in-depth explanation.
The `migrations_class` is the class used to define your Migrations. See the [Migrations](migrations.html) chapter for an
in-depth explanation.

### Autoregistration
Sequent 6.0.2 introduced autoregistration of `command_handlers` and `event_handlers` via
Expand Down Expand Up @@ -116,7 +114,7 @@ For the latest configuration possibilities please check the `Sequent::Configurat

| Option | Meaning | Default Value |
|-----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| migrations_class_name | **Required**. The name of the [class](#minimum-configuration) containing the migrations. | `'Migrations'` |
| migrations_class | The [class](#minimum-configuration) containing the migrations | `'Migrations'` |
| command_handlers | The list of [CommandHandlers](command-handler.html) | `[]` |
| event_handlers | The list of [Projectors](projector.html) and [Workflows](workflow.html) | `[]` |
| aggregate_repository | The [AggregateRepository](aggregate-repository.html) | `Sequent::Core::AggregateRepository.new` |
Expand Down
5 changes: 3 additions & 2 deletions docs/docs/concepts/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ and add a column.

## Defining migrations

In Sequent, migrations are defined in your `Sequent.configuration.migrations_class_name`
In Sequent, migrations are defined in your `Sequent.configuration.migrations_class`, which must extend
`Sequent::Core::Projectors`.

### ReplayTable

Expand Down Expand Up @@ -130,7 +131,7 @@ As you can see there is no need to use the **%SUFFIX%** placeholder in these mig
since it is an in-place update.
{: .notice--info}

**Important**:
**Important**:
1. You must also incorporate your changes to the table-name.sql (`user_records.sql` in case of the example) file.
So the column `first_name` should be added as well in the table definition. Reason for this is that currently Sequent only
executes the "main" `sql` files when re-generating the schema from scratch (e.g. in tests).
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/rails-sequent.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require_relative '../../db/sequent_migrations'

Rails.application.reloader.to_prepare do
Sequent.configure do |config|
config.migrations_class_name = 'SequentMigrations'
config.migrations_class = SequentMigrations
config.enable_autoregistration = true

config.database_config_directory = 'config'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Sequent.configure do |config|
config.enable_autoregistration = true

config.migrations_class_name = 'SequentMigrations'
config.migrations_class = SequentMigrations

config.database_config_directory = 'config'

Expand Down
20 changes: 13 additions & 7 deletions lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Configuration
DEFAULT_VIEW_SCHEMA_NAME = 'view_schema'
DEFAULT_EVENT_STORE_SCHEMA_NAME = 'sequent_schema'

MIGRATIONS_CLASS_NAME = 'Sequent::Migrations::Projectors'
DEFAULT_MIGRATIONS_CLASS = nil

DEFAULT_NUMBER_OF_REPLAY_PROCESSES = 4
DEFAULT_REPLAY_GROUP_TARGET_SIZE = 250_000
Expand Down Expand Up @@ -79,7 +79,7 @@ class Configuration
:enable_autoregistration,
:aggregate_snapshot_versions

attr_reader :migrations_class_name,
attr_reader :migrations_class,
:versions_table_name

def self.instance
Expand Down Expand Up @@ -117,7 +117,7 @@ def initialize
self.migration_sql_files_directory = DEFAULT_MIGRATION_SQL_FILES_DIRECTORY
self.view_schema_name = DEFAULT_VIEW_SCHEMA_NAME
self.event_store_schema_name = DEFAULT_EVENT_STORE_SCHEMA_NAME
self.migrations_class_name = MIGRATIONS_CLASS_NAME
self.migrations_class = DEFAULT_MIGRATIONS_CLASS
self.number_of_replay_processes = DEFAULT_NUMBER_OF_REPLAY_PROCESSES
self.replay_group_target_size = DEFAULT_REPLAY_GROUP_TARGET_SIZE

Expand Down Expand Up @@ -153,13 +153,19 @@ def versions_table_name=(table_name)
Sequent::Migrations::Versions.table_name = table_name
end

def migrations_class_name = @migrations_class&.name

def migrations_class_name=(class_name)
migration_class = Class.const_get(class_name)
unless migration_class <= Sequent::Migrations::Projectors
fail ArgumentError, "#{migration_class} must extend Sequent::Migrations::Projectors"
warn '[DEPRECATED] use `migrations_class=` to set the migrations class directly'
self.migrations_class = class_name.nil? ? nil : Class.const_get(class_name)
end

def migrations_class=(migrations_class)
if migrations_class.present? && !(migrations_class < Sequent::Migrations::Projectors)
fail ArgumentError, "#{migrations_class} must extend Sequent::Migrations::Projectors"
end

@migrations_class_name = class_name
@migrations_class = migrations_class
end

# @!visibility private
Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
require_relative 'random_uuid_generator'
require_relative 'event_publisher'
require_relative 'aggregate_snapshotter'
require_relative 'projectors'
48 changes: 48 additions & 0 deletions lib/sequent/core/event_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module Sequent
module Core
class ProjectorMigrationError < RuntimeError; end
class UnknownActiveProjectorError < ProjectorMigrationError; end
class ReplayingProjectorMismatchError < ProjectorMigrationError; end
class NewerProjectorIsActiveError < ProjectorMigrationError; end

#
# EventPublisher ensures that, for every thread, events will be published
# in the order in which they are queued for publishing.
Expand Down Expand Up @@ -30,6 +35,15 @@ def message
def publish_events(events)
return if configuration.disable_event_handlers

ensure_no_unknown_active_projectors!

events.each { |event| events_queue.push(event) }
process_events
end

def replay_events(events)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in an EventReplayer or so?

ensure_only_replaying_projectors_subscribed!

events.each { |event| events_queue.push(event) }
process_events
end
Expand All @@ -55,6 +69,8 @@ def process_event(event)

configuration.event_handlers.each do |handler|
handler.handle_message event
rescue ProjectorMigrationError
raise
rescue StandardError
raise PublishEventError.new(handler.class, event)
end
Expand All @@ -63,6 +79,38 @@ def process_event(event)
def configuration
Sequent.configuration
end

def ensure_no_unknown_active_projectors!
expected_version = Sequent.migrations_class&.version
return if expected_version.nil?

registered_projectors = Migratable.projectors.to_set(&:name)
active_projectors = Projectors
.projector_states
.values
.select { |s| s.active_version == expected_version }
.to_set(&:name)
unknown_active_projectors = active_projectors - registered_projectors
if unknown_active_projectors.present?
fail UnknownActiveProjectorError,
"cannot publish event when unknown projectors are active #{unknown_active_projectors}"
end
end

def ensure_only_replaying_projectors_subscribed!
return unless Sequent.migrations_class

registered_projectors = Migratable.projectors.to_set(&:name)
projector_states = Projectors.projector_states
replaying_projectors = projector_states
.values
.select { |state| state.replaying? || state.activating? }
.to_set(&:name)
if registered_projectors != replaying_projectors
fail ReplayingProjectorMismatchError,
"cannot replay event when different projectors are replaying #{replaying_projectors}"
end
end
end
end
end
24 changes: 15 additions & 9 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def events_exists?(aggregate_id)
def replay_events
warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`'
events = yield.map { |event_hash| deserialize_event(event_hash) }
publish_events(events)
Sequent.configuration.transaction_provider.transactional do
Sequent.configuration.event_publisher.replay_events(events)
end
end

##
Expand All @@ -188,14 +190,18 @@ def replay_events_from_cursor(get_events:, block_size: 2000,
progress = 0
cursor = get_events.call
ids_replayed = []
cursor.each_row(block_size: block_size).each do |record|
event = deserialize_event(record)
publish_events([event])
progress += 1
ids_replayed << record['aggregate_id']
if progress % block_size == 0
on_progress[progress, false, ids_replayed]
ids_replayed.clear
cursor.each_row_batch(block_size:).each do |records|
events = records.map(&method(:deserialize_event))
Sequent.configuration.transaction_provider.transactional do
Sequent.configuration.event_publisher.replay_events(events)
end
records.each do |record|
progress += 1
ids_replayed << record['aggregate_id']
if progress % block_size == 0
on_progress[progress, false, ids_replayed]
ids_replayed.clear
end
end
end
on_progress[progress, true, ids_replayed]
Expand Down
32 changes: 17 additions & 15 deletions lib/sequent/core/projector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'helpers/message_handler'
require_relative 'persistors/active_record_persistor'
require_relative 'projectors'

module Sequent
module Core
Expand Down Expand Up @@ -111,6 +112,10 @@ def self.replay_persistor
nil
end

def dispatch_message(...)
super if is_active?
end

def_delegators :@persistor, :execute_sql, :commit

def update_record(record_class, *rest, &block)
Expand Down Expand Up @@ -193,24 +198,21 @@ def ensure_valid!
EOS
end
end
end

#
# Utility class containing all subclasses of Projector.
#
class Projectors
class << self
def projectors
Sequent::Projector.descendants
end
def is_active?
version = Sequent.migrations_class&.version
return true if version.nil?

def all
projectors
end
projector_state = Projectors.projector_states[self.class.name]
return false if projector_state.nil?

def find(projector_name)
projectors.find { |c| c.name == projector_name }
end
return true if projector_state.active_version == version

# Replaying or activating the current version, so run this projector (it will write to a temporary table).
return true if projector_state.replaying_version == version || projector_state.activating_version == version

fail NewerProjectorIsActiveError,
"projector #{self.class} version #{version} does not match state #{projector_state}"
end
end
end
Expand Down
Loading