Skip to content

Commit 0ecbd92

Browse files
committed
Check projector active state using a specialized event publisher
This simplifies the projector logic as all projector state tracking is now only checked by the `ActiveProjectorsEventPublisher`. Replay now uses the normal `EventPublisher` so the logic to determine if an event is allowed to be processed by a projector can be simplified.
1 parent d650197 commit 0ecbd92

9 files changed

Lines changed: 197 additions & 191 deletions

File tree

CHANGELOG.md

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,25 @@
1111

1212
- Also in preperation of rolling upgrades the enabled projectors are
1313
now optionally tracked in the `projector_states` table. Use the
14-
`enable_projector_states` configuration flag to enable this feature.
15-
16-
Only projectors that are active or replaying will process
17-
events. When an unknown projector is active all event processing
18-
will be blocked. This allows deploying a new version while the old
19-
version of the code is still running and processing events. Only
20-
after executing `Sequent#activate_current_configuration!` will the
21-
new code start processing events, while the old code is blocked. At
22-
this moment traffic can be redirected from the old version to the
23-
new version.
14+
`enable_projector_states` configuration flag and
15+
`ActiveProjectorsEventPublisher` as the `event_publisher` enable
16+
this feature.
17+
18+
Only projectors that are active will process events during normal
19+
operation. Events that are published while a newer projector is
20+
activating or active will fail with a
21+
`DifferentProjectorVersionIsActiveError` error. Event replay uses
22+
the normal `EventPublisher` which does not check for projector
23+
state, since replaying will write to temporary tables until replay
24+
is finalized
25+
26+
When an unknown projector is active all event processing will be
27+
blocked. This allows deploying a new version while the old version
28+
of the code is still running and processing events. Only after
29+
executing `Sequent#activate_current_configuration!` will the new
30+
code start processing events, while the old code is blocked. At this
31+
moment traffic can be redirected from the old version to the new
32+
version.
2433

2534
Activating the current configuration is part of the
2635
`migrate:offline` rake task, so the current deployment process (run
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# frozen_string_literal: true
2+
3+
module Sequent
4+
module Core
5+
class UnknownActiveProjectorError < ProjectorMigrationError; end
6+
class DifferentProjectorVersionIsActiveError < ProjectorMigrationError; end
7+
8+
#
9+
# Subtype of EventPublisher that only dispatches events to Projectors that are marked active in
10+
# the projector_states table. Also fails if there are unknown active projectors. This allows
11+
# upgrading to new code (with new projectors) without having to shutdown the old code first, as
12+
# the old code will start failing as soon as the new code's Sequent configuration is activated
13+
# (using `Sequent#activate_current_configuration!`).
14+
#
15+
class ActiveProjectorsEventPublisher < EventPublisher
16+
def publish_events(events)
17+
return if configuration.disable_event_handlers
18+
19+
ensure_no_unknown_active_projectors!
20+
21+
super
22+
end
23+
24+
private
25+
26+
def handle_message(handler, event)
27+
super if active?(handler)
28+
end
29+
30+
def ensure_no_unknown_active_projectors!
31+
expected_version = Sequent.migrations_class&.version
32+
return if expected_version.nil?
33+
34+
registered_projectors = Migratable.projectors.to_set(&:name)
35+
active_projectors = Projectors.projector_states
36+
.values
37+
.select { |s| s.active_version == expected_version }
38+
.to_set(&:name)
39+
unknown_active_projectors = active_projectors - registered_projectors
40+
if unknown_active_projectors.present?
41+
fail UnknownActiveProjectorError,
42+
"cannot publish event when unknown projectors are active #{unknown_active_projectors}"
43+
end
44+
end
45+
46+
def active?(handler)
47+
return true unless handler.is_a?(Projector)
48+
49+
version = Sequent.migrations_class&.version
50+
return true if version.nil?
51+
52+
# Projector states are not enable so all projectors are considered active
53+
return true unless Sequent.configuration.enable_projector_states
54+
55+
projector_state = Projectors.projector_states[handler.class.name]
56+
return false if projector_state.nil?
57+
58+
return true if projector_state.activating_version.nil? && projector_state.active_version == version
59+
60+
# A different projector version is active, so we cannot write
61+
# new events since they will not be properly propagated.
62+
fail DifferentProjectorVersionIsActiveError,
63+
"projector #{handler.class} version #{version} does not match state #{projector_state}"
64+
end
65+
end
66+
end
67+
end

lib/sequent/core/core.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@
1919
require_relative 'workflow'
2020
require_relative 'random_uuid_generator'
2121
require_relative 'event_publisher'
22+
require_relative 'active_projectors_event_publisher'
2223
require_relative 'aggregate_snapshotter'
2324
require_relative 'projectors'

lib/sequent/core/event_publisher.rb

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
module Sequent
44
module Core
55
class ProjectorMigrationError < RuntimeError; end
6-
class UnknownActiveProjectorError < ProjectorMigrationError; end
7-
class ReplayingProjectorMismatchError < ProjectorMigrationError; end
8-
class DifferentProjectorVersionIsActiveError < ProjectorMigrationError; end
96

107
#
118
# EventPublisher ensures that, for every thread, events will be published
@@ -35,15 +32,6 @@ def message
3532
def publish_events(events)
3633
return if configuration.disable_event_handlers
3734

38-
ensure_no_unknown_active_projectors!
39-
40-
events.each { |event| events_queue.push(event) }
41-
process_events
42-
end
43-
44-
def replay_events(events)
45-
ensure_only_replaying_projectors_subscribed!
46-
4735
events.each { |event| events_queue.push(event) }
4836
process_events
4937
end
@@ -68,49 +56,20 @@ def process_event(event)
6856
Sequent.logger.debug("[EventPublisher] Publishing event #{event.class}") if Sequent.logger.debug?
6957

7058
configuration.event_handlers.each do |handler|
71-
handler.handle_message event
59+
handle_message(handler, event)
7260
rescue ProjectorMigrationError
7361
raise
7462
rescue StandardError
7563
raise PublishEventError.new(handler.class, event)
7664
end
7765
end
7866

79-
def configuration
80-
Sequent.configuration
67+
def handle_message(handler, event)
68+
handler.handle_message(event)
8169
end
8270

83-
def ensure_no_unknown_active_projectors!
84-
return unless Sequent.configuration.enable_projector_states
85-
86-
expected_version = Sequent.migrations_class&.version
87-
return if expected_version.nil?
88-
89-
registered_projectors = Migratable.projectors.to_set(&:name)
90-
active_projectors = Projectors.projector_states
91-
.values
92-
.select { |s| s.active_version == expected_version }
93-
.to_set(&:name)
94-
unknown_active_projectors = active_projectors - registered_projectors
95-
if unknown_active_projectors.present?
96-
fail UnknownActiveProjectorError,
97-
"cannot publish event when unknown projectors are active #{unknown_active_projectors}"
98-
end
99-
end
100-
101-
def ensure_only_replaying_projectors_subscribed!
102-
return unless Sequent.configuration.enable_projector_states
103-
return unless Sequent.migrations_class
104-
105-
registered_projectors = Migratable.projectors.to_set(&:name)
106-
replaying_projectors = Projectors.projector_states
107-
.values
108-
.select { |state| state.replaying? || state.activating? }
109-
.to_set(&:name)
110-
if registered_projectors != replaying_projectors
111-
fail ReplayingProjectorMismatchError,
112-
"cannot replay event when different projectors are replaying #{replaying_projectors}"
113-
end
71+
def configuration
72+
Sequent.configuration
11473
end
11574
end
11675
end

lib/sequent/core/event_store.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,19 @@ def events_exists?(aggregate_id)
172172
#
173173
# @param get_events lambda that returns the events cursor
174174
# @param on_progress lambda that gets called on substantial progress
175-
def replay_events_from_cursor(get_events:, block_size: 2000,
176-
on_progress: PRINT_PROGRESS)
175+
def replay_events_from_cursor(
176+
get_events:,
177+
event_publisher: Sequent.configuration.event_publisher,
178+
block_size: 2000,
179+
on_progress: PRINT_PROGRESS
180+
)
177181
progress = 0
178182
cursor = get_events.call
179183
ids_replayed = []
180184
cursor.each_row_batch(block_size:).each do |records|
181185
events = records.map(&method(:deserialize_event))
182186
Sequent.configuration.transaction_provider.transactional do
183-
Sequent.configuration.event_publisher.replay_events(events)
187+
event_publisher.publish_events(events)
184188
end
185189
records.each do |record|
186190
progress += 1

lib/sequent/core/projector.rb

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,6 @@ def self.replay_persistor
112112
nil
113113
end
114114

115-
def dispatch_message(...)
116-
super if active?
117-
end
118-
119115
def_delegators :@persistor, :execute_sql, :commit
120116

121117
def update_record(record_class, *rest, &block)
@@ -198,32 +194,6 @@ def ensure_valid!
198194
EOS
199195
end
200196
end
201-
202-
def active?
203-
version = Sequent.migrations_class&.version
204-
return true if version.nil?
205-
206-
# Projector states are not enable so all projectors are considered active
207-
return true unless Sequent.configuration.enable_projector_states
208-
209-
projector_state = Projectors.projector_states[self.class.name]
210-
return false if projector_state.nil?
211-
212-
if projector_state.activating_version.present?
213-
# Current projector version is activating, so we write to the temporary replay table.
214-
return true if projector_state.activating_version == version
215-
else
216-
return true if projector_state.active_version == version
217-
218-
# Replaying the current version, so run this projector (it will write to a temporary table).
219-
return true if projector_state.replaying_version == version
220-
end
221-
222-
# A different projector version is active or activating, so we cannot write new events since they will
223-
# not be properly propagated.
224-
fail DifferentProjectorVersionIsActiveError,
225-
"projector #{self.class} version #{version} does not match state #{projector_state}"
226-
end
227197
end
228198
end
229199
end

lib/sequent/migrations/view_schema.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,9 @@ def replay_events(
395395
&on_progress
396396
)
397397
Sequent.configuration.event_store.replay_events_from_cursor(
398-
block_size: 1000,
399398
get_events:,
399+
event_publisher: Sequent::Core::EventPublisher.new,
400+
block_size: 1000,
400401
on_progress:,
401402
)
402403

0 commit comments

Comments
 (0)