Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@
old version of the code will continue to use snapshot version 1 (the
default version) while new code will only use snapshot version 2.

- Also in preperation of rolling upgrades the enabled projectors are
now optionally tracked in the `projector_states` table. Use the
`enable_projector_states` configuration flag to enable this feature.

Only projectors that are active or replaying will process
events. When an unknown projector is active all event processing
will be blocked. This allows deploying a new version while the old
version of the code is still running and processing events. Only
after executing `Sequent#activate_current_configuration!` will the
new code start processing events, while the old code is blocked. At
this moment traffic can be redirected from the old version to the
new version.

Activating the current configuration is part of the
`migrate:offline` rake task, so the current deployment process (run
online migrations, take old code offline, run offline migrations,
start running the new code) will keep working as is.

# Changelog 8.2.1

- Bug: Fix resetting column information and table_name when using Single Table Inheritance.
Expand Down
26 changes: 26 additions & 0 deletions db/migrate/20250601120000_sequent_track_projector_states.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

class SequentTrackProjectorStates < ActiveRecord::Migration[7.2]
def change
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
create_table :projector_states, id: false, primary_key: :name do |t|
t.primary_key :name, :text, null: false
t.integer :active_version
t.integer :activating_version
t.integer :replaying_version
t.timestamptz :created_at, precision: 6, null: false, default: -> { 'NOW()' }
t.timestamptz :updated_at, precision: 6, null: false, default: -> { 'NOW()' }
end

add_check_constraint :projector_states,
'replaying_version IS NULL OR activating_version IS NULL',
name: 'replaying_conflicts_with_activating'
add_check_constraint :projector_states,
'replaying_version > active_version',
name: 'replaying_newer_then_active'
add_check_constraint :projector_states,
'activating_version > active_version',
name: 'activating_newer_than_active'
end
end
end
26 changes: 26 additions & 0 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,23 @@ CREATE TABLE sequent_schema.partition_key_changes (
);


--
-- Name: projector_states; Type: TABLE; Schema: sequent_schema; Owner: -
--

CREATE TABLE sequent_schema.projector_states (
name text NOT NULL,
active_version integer,
activating_version integer,
replaying_version integer,
created_at timestamp with time zone DEFAULT now() NOT NULL,
updated_at timestamp with time zone DEFAULT now() NOT NULL,
CONSTRAINT activating_newer_than_active CHECK ((activating_version > active_version)),
CONSTRAINT replaying_conflicts_with_activating CHECK (((replaying_version IS NULL) OR (activating_version IS NULL))),
CONSTRAINT replaying_newer_then_active CHECK ((replaying_version > active_version))
);


--
-- Name: saved_event_records; Type: TABLE; Schema: sequent_schema; Owner: -
--
Expand Down Expand Up @@ -1221,6 +1238,14 @@ ALTER TABLE ONLY sequent_schema.partition_key_changes
ADD CONSTRAINT partition_key_changes_pkey PRIMARY KEY (aggregate_id);


--
-- Name: projector_states projector_states_pkey; Type: CONSTRAINT; Schema: sequent_schema; Owner: -
--

ALTER TABLE ONLY sequent_schema.projector_states
ADD CONSTRAINT projector_states_pkey PRIMARY KEY (name);


--
-- Name: saved_event_records saved_event_records_pkey; Type: CONSTRAINT; Schema: sequent_schema; Owner: -
--
Expand Down Expand Up @@ -1491,6 +1516,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records
SET search_path TO public,view_schema,sequent_schema;

INSERT INTO "schema_migrations" (version) VALUES
('20250601120000'),
('20250512135500'),
('20250509133000'),
('20250509120000'),
Expand Down
8 changes: 3 additions & 5 deletions docs/docs/concepts/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ As a bare minimum you need:
require './db/migrations'

Sequent.configure do |config|
config.migrations_class_name = 'Migrations'

# sequent >= 6.0.2
config.enable_autoregistration = true

Expand All @@ -40,8 +38,8 @@ Sequent.configure do |config|
end
```

The `migration_class_name` is the name of the class used to define your Migrations. See
the [Migrations](migrations.html) chapter for an in-depth explanation.
The `migrations_class` is the class used to define your Migrations. See the [Migrations](migrations.html) chapter for an
in-depth explanation.

### Autoregistration
Sequent 6.0.2 introduced autoregistration of `command_handlers` and `event_handlers` via
Expand Down Expand Up @@ -116,7 +114,7 @@ For the latest configuration possibilities please check the `Sequent::Configurat

| Option | Meaning | Default Value |
|-----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| migrations_class_name | **Required**. The name of the [class](#minimum-configuration) containing the migrations. | `'Migrations'` |
| migrations_class | The [class](#minimum-configuration) containing the migrations | `'Migrations'` |
| command_handlers | The list of [CommandHandlers](command-handler.html) | `[]` |
| event_handlers | The list of [Projectors](projector.html) and [Workflows](workflow.html) | `[]` |
| aggregate_repository | The [AggregateRepository](aggregate-repository.html) | `Sequent::Core::AggregateRepository.new` |
Expand Down
5 changes: 3 additions & 2 deletions docs/docs/concepts/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ and add a column.

## Defining migrations

In Sequent, migrations are defined in your `Sequent.configuration.migrations_class_name`
In Sequent, migrations are defined in your `Sequent.configuration.migrations_class`, which must extend
`Sequent::Core::Projectors`.

### ReplayTable

Expand Down Expand Up @@ -130,7 +131,7 @@ As you can see there is no need to use the **%SUFFIX%** placeholder in these mig
since it is an in-place update.
{: .notice--info}

**Important**:
**Important**:
1. You must also incorporate your changes to the table-name.sql (`user_records.sql` in case of the example) file.
So the column `first_name` should be added as well in the table definition. Reason for this is that currently Sequent only
executes the "main" `sql` files when re-generating the schema from scratch (e.g. in tests).
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/rails-sequent.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require_relative '../../db/sequent_migrations'

Rails.application.reloader.to_prepare do
Sequent.configure do |config|
config.migrations_class_name = 'SequentMigrations'
config.migrations_class = SequentMigrations
config.enable_autoregistration = true

config.database_config_directory = 'config'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Sequent.configure do |config|
config.enable_autoregistration = true

config.migrations_class_name = 'SequentMigrations'
config.migrations_class = SequentMigrations

config.database_config_directory = 'config'

Expand Down
24 changes: 16 additions & 8 deletions lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Configuration
DEFAULT_VIEW_SCHEMA_NAME = 'view_schema'
DEFAULT_EVENT_STORE_SCHEMA_NAME = 'sequent_schema'

MIGRATIONS_CLASS_NAME = 'Sequent::Migrations::Projectors'
DEFAULT_MIGRATIONS_CLASS = nil

DEFAULT_NUMBER_OF_REPLAY_PROCESSES = 4
DEFAULT_REPLAY_GROUP_TARGET_SIZE = 250_000
Expand Down Expand Up @@ -77,9 +77,10 @@ class Configuration
:primary_database_key,
:time_precision,
:enable_autoregistration,
:aggregate_snapshot_versions
:aggregate_snapshot_versions,
:enable_projector_states

attr_reader :migrations_class_name,
attr_reader :migrations_class,
:versions_table_name

def self.instance
Expand Down Expand Up @@ -117,7 +118,7 @@ def initialize
self.migration_sql_files_directory = DEFAULT_MIGRATION_SQL_FILES_DIRECTORY
self.view_schema_name = DEFAULT_VIEW_SCHEMA_NAME
self.event_store_schema_name = DEFAULT_EVENT_STORE_SCHEMA_NAME
self.migrations_class_name = MIGRATIONS_CLASS_NAME
self.migrations_class = DEFAULT_MIGRATIONS_CLASS
self.number_of_replay_processes = DEFAULT_NUMBER_OF_REPLAY_PROCESSES
self.replay_group_target_size = DEFAULT_REPLAY_GROUP_TARGET_SIZE

Expand All @@ -140,6 +141,7 @@ def initialize

self.enable_autoregistration = false
self.aggregate_snapshot_versions = DEFAULT_AGGREGATE_SNAPSHOT_VERSIONS
self.enable_projector_states = false
end

def can_use_multiple_databases?
Expand All @@ -153,13 +155,19 @@ def versions_table_name=(table_name)
Sequent::Migrations::Versions.table_name = table_name
end

def migrations_class_name = @migrations_class&.name

def migrations_class_name=(class_name)
migration_class = Class.const_get(class_name)
unless migration_class <= Sequent::Migrations::Projectors
fail ArgumentError, "#{migration_class} must extend Sequent::Migrations::Projectors"
warn '[DEPRECATED] use `migrations_class=` to set the migrations class directly'
self.migrations_class = class_name.nil? ? nil : Class.const_get(class_name)
end

def migrations_class=(migrations_class)
if migrations_class.present? && !(migrations_class < Sequent::Migrations::Projectors)
fail ArgumentError, "#{migrations_class} must extend Sequent::Migrations::Projectors"
end

@migrations_class_name = class_name
@migrations_class = migrations_class
end

# @!visibility private
Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
require_relative 'random_uuid_generator'
require_relative 'event_publisher'
require_relative 'aggregate_snapshotter'
require_relative 'projectors'
49 changes: 49 additions & 0 deletions lib/sequent/core/event_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module Sequent
module Core
class ProjectorMigrationError < RuntimeError; end
class UnknownActiveProjectorError < ProjectorMigrationError; end
class ReplayingProjectorMismatchError < ProjectorMigrationError; end
class DifferentProjectorVersionIsActiveError < ProjectorMigrationError; end

#
# EventPublisher ensures that, for every thread, events will be published
# in the order in which they are queued for publishing.
Expand Down Expand Up @@ -30,6 +35,15 @@ def message
def publish_events(events)
return if configuration.disable_event_handlers

ensure_no_unknown_active_projectors!

events.each { |event| events_queue.push(event) }
process_events
end

def replay_events(events)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in an EventReplayer or so?

ensure_only_replaying_projectors_subscribed!

events.each { |event| events_queue.push(event) }
process_events
end
Expand All @@ -55,6 +69,8 @@ def process_event(event)

configuration.event_handlers.each do |handler|
handler.handle_message event
rescue ProjectorMigrationError
raise
rescue StandardError
raise PublishEventError.new(handler.class, event)
end
Expand All @@ -63,6 +79,39 @@ def process_event(event)
def configuration
Sequent.configuration
end

def ensure_no_unknown_active_projectors!
return unless Sequent.configuration.enable_projector_states

expected_version = Sequent.migrations_class&.version
return if expected_version.nil?

registered_projectors = Migratable.projectors.to_set(&:name)
active_projectors = Projectors.projector_states
.values
.select { |s| s.active_version == expected_version }
.to_set(&:name)
unknown_active_projectors = active_projectors - registered_projectors
if unknown_active_projectors.present?
fail UnknownActiveProjectorError,
"cannot publish event when unknown projectors are active #{unknown_active_projectors}"
end
end

def ensure_only_replaying_projectors_subscribed!
return unless Sequent.configuration.enable_projector_states
return unless Sequent.migrations_class

registered_projectors = Migratable.projectors.to_set(&:name)
replaying_projectors = Projectors.projector_states
.values
.select { |state| state.replaying? || state.activating? }
.to_set(&:name)
if registered_projectors != replaying_projectors
fail ReplayingProjectorMismatchError,
"cannot replay event when different projectors are replaying #{replaying_projectors}"
end
end
end
end
end
31 changes: 12 additions & 19 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,6 @@ def events_exists?(aggregate_id)
Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id)
end

##
# Replays all events in the event store to the registered event_handlers.
#
# @param block that returns the events.
# <b>DEPRECATED:</b> use <tt>replay_events_from_cursor</tt> instead.
def replay_events
warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`'
events = yield.map { |event_hash| deserialize_event(event_hash) }
publish_events(events)
end

##
# Replays all events on an `EventRecord` cursor from the given block.
#
Expand All @@ -188,14 +177,18 @@ def replay_events_from_cursor(get_events:, block_size: 2000,
progress = 0
cursor = get_events.call
ids_replayed = []
cursor.each_row(block_size: block_size).each do |record|
event = deserialize_event(record)
publish_events([event])
progress += 1
ids_replayed << record['aggregate_id']
if progress % block_size == 0
on_progress[progress, false, ids_replayed]
ids_replayed.clear
cursor.each_row_batch(block_size:).each do |records|
events = records.map(&method(:deserialize_event))
Sequent.configuration.transaction_provider.transactional do
Sequent.configuration.event_publisher.replay_events(events)
end
records.each do |record|
progress += 1
ids_replayed << record['aggregate_id']
if progress % block_size == 0
on_progress[progress, false, ids_replayed]
ids_replayed.clear
end
end
end
on_progress[progress, true, ids_replayed]
Expand Down
Loading