Skip to content

Commit a1b736f

Browse files
robacourtclaude
andauthored
fix: Correct dependency tracking for nested subqueries (#3757)
## Summary - Fixes dependency tracking for nested subqueries when intermediate rows change their linking column without changing the tracked column - Previously, such updates were incorrectly filtered out, causing stale tag tracking that led to incorrect row deletions ## Changes - **shape.ex**: Keep updates with tag changes (`removed_move_tags != []`) even if selected columns are unchanged - **materializer.ex**: Skip value count updates when value is unchanged to avoid spurious move_out/move_in events ## Test plan - [x] Added unit tests in `shape_test.exs` for `filtered_columns_changed?` behavior - [x] Added unit tests in `materializer_test.exs` for tag-only update handling - [x] Integration tests in `subquery_dependency_update_test.exs` pass - [x] Full test suite passes (1322 tests, 0 failures) 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Fixed dependency tracking for nested subqueries so intermediate row moves that don’t change tracked values no longer cause spurious deletions. * Preserved tag-only updates in change detection to prevent stale tag tracking when rows move between parents. * **Tests** * Added extensive tests covering tag-only updates and nested subquery dependency scenarios to validate correctness. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent dba090e commit a1b736f

File tree

6 files changed

+467
-11
lines changed

6 files changed

+467
-11
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Fix dependency tracking for nested subqueries when intermediate rows change their linking column without changing the tracked column. Previously, such updates were incorrectly filtered out, causing stale tag tracking that led to incorrect row deletions when the old parent lost its qualifying status.

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,20 +327,33 @@ defmodule Electric.Shapes.Consumer.Materializer do
327327
},
328328
{{index, tag_indices}, counts_and_events} ->
329329
# TODO: this is written as if it supports multiple selected columns, but it doesn't for now
330-
if Enum.any?(state.columns, &is_map_key(record, &1)) do
331-
{value, original_string} = cast!(record, state)
332-
old_value = Map.fetch!(index, key)
333-
index = Map.put(index, key, value)
330+
columns_present = Enum.any?(state.columns, &is_map_key(record, &1))
331+
has_tag_updates = removed_move_tags != []
334332

333+
if columns_present or has_tag_updates do
335334
tag_indices =
336335
tag_indices
337336
|> remove_row_from_tag_indices(key, removed_move_tags)
338337
|> add_row_to_tag_indices(key, move_tags)
339338

340-
{{index, tag_indices},
341-
counts_and_events
342-
|> decrement_value(old_value, value_to_string(old_value, state))
343-
|> increment_value(value, original_string)}
339+
if columns_present do
340+
{value, original_string} = cast!(record, state)
341+
old_value = Map.fetch!(index, key)
342+
index = Map.put(index, key, value)
343+
344+
# Skip decrement/increment dance if value hasn't changed to avoid
345+
# spurious move_out/move_in events when only the tag changed
346+
if old_value == value do
347+
{{index, tag_indices}, counts_and_events}
348+
else
349+
{{index, tag_indices},
350+
counts_and_events
351+
|> decrement_value(old_value, value_to_string(old_value, state))
352+
|> increment_value(value, original_string)}
353+
end
354+
else
355+
{{index, tag_indices}, counts_and_events}
356+
end
344357
else
345358
# Nothing relevant to this materializer has been updated
346359
{{index, tag_indices}, counts_and_events}

packages/sync-service/lib/electric/shapes/shape.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ defmodule Electric.Shapes.Shape do
613613
converted_changes
614614
|> Enum.map(&fill_move_tags(&1, shape, opts[:stack_id], opts[:shape_handle]))
615615
|> Enum.map(&filter_change_columns(&1, selected_columns))
616-
|> Enum.filter(&filtered_columns_changed?/1)
616+
|> Enum.filter(&should_keep_change?/1)
617617
end
618618

619619
defp filter_change_columns(change, nil), do: change
@@ -684,10 +684,13 @@ defmodule Electric.Shapes.Shape do
684684
end)
685685
end
686686

687-
defp filtered_columns_changed?(%Changes.UpdatedRecord{old_record: record, record: record}),
687+
defp should_keep_change?(%Changes.UpdatedRecord{removed_move_tags: removed_move_tags})
688+
when removed_move_tags != [], do: true
689+
690+
defp should_keep_change?(%Changes.UpdatedRecord{old_record: record, record: record}),
688691
do: false
689692

690-
defp filtered_columns_changed?(_), do: true
693+
defp should_keep_change?(_), do: true
691694

692695
# If neither oid nor schema/table name matches, then shape is not affected
693696
def is_affected_by_relation_change?(

packages/sync-service/test/electric/shapes/consumer/materializer_test.exs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,150 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do
370370
end
371371
end
372372

373+
describe "tag-only updates (value unchanged)" do
374+
@tag snapshot_data: [%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}}]
375+
test "update with tag change but unchanged value updates tags without events", ctx do
376+
ctx = with_materializer(ctx)
377+
378+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
379+
380+
# Update where tags change but the tracked value stays the same
381+
Materializer.new_changes(
382+
ctx,
383+
[
384+
%Changes.UpdatedRecord{
385+
key: ~s("public"."test_table"/"1"),
386+
record: %{"id" => "1", "value" => "10"},
387+
old_record: %{"id" => "1", "value" => "10"},
388+
move_tags: ["new_tag"],
389+
removed_move_tags: ["old_tag"]
390+
}
391+
]
392+
)
393+
394+
# Value should still be present
395+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
396+
397+
# No move events should be emitted since the value didn't change
398+
refute_received {:materializer_changes, _, _}
399+
end
400+
401+
@tag snapshot_data: {
402+
[%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}, move_tags: ["old_tag"]}],
403+
[]
404+
}
405+
test "tag is updated so subsequent move_out for old tag finds nothing", ctx do
406+
ctx = with_materializer(ctx)
407+
408+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
409+
410+
# Update that changes the tag from old_tag to new_tag but keeps value the same
411+
Materializer.new_changes(
412+
ctx,
413+
[
414+
%Changes.UpdatedRecord{
415+
key: ~s("public"."test_table"/"1"),
416+
record: %{"id" => "1", "value" => "10"},
417+
old_record: %{"id" => "1", "value" => "10"},
418+
move_tags: ["new_tag"],
419+
removed_move_tags: ["old_tag"]
420+
}
421+
]
422+
)
423+
424+
# No events from the tag-only update
425+
refute_received {:materializer_changes, _, _}
426+
427+
# Now send a move_out for the OLD tag - should find nothing since the row moved to new_tag
428+
Materializer.new_changes(ctx, [
429+
%{headers: %{event: "move-out", patterns: [%{pos: 0, value: "old_tag"}]}}
430+
])
431+
432+
# Value should still be present (row wasn't removed)
433+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
434+
435+
# No move events since the row was already moved to new_tag
436+
refute_received {:materializer_changes, _, _}
437+
end
438+
439+
@tag snapshot_data: {
440+
[%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}, move_tags: ["old_tag"]}],
441+
[]
442+
}
443+
test "move_out for new tag after tag update removes the row", ctx do
444+
ctx = with_materializer(ctx)
445+
446+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
447+
448+
# Update that changes the tag from old_tag to new_tag
449+
Materializer.new_changes(
450+
ctx,
451+
[
452+
%Changes.UpdatedRecord{
453+
key: ~s("public"."test_table"/"1"),
454+
record: %{"id" => "1", "value" => "10"},
455+
old_record: %{"id" => "1", "value" => "10"},
456+
move_tags: ["new_tag"],
457+
removed_move_tags: ["old_tag"]
458+
}
459+
]
460+
)
461+
462+
refute_received {:materializer_changes, _, _}
463+
464+
# Now send a move_out for the NEW tag - should find and remove the row
465+
Materializer.new_changes(ctx, [
466+
%{headers: %{event: "move-out", patterns: [%{pos: 0, value: "new_tag"}]}}
467+
])
468+
469+
# Value should be gone
470+
assert Materializer.get_link_values(ctx) == MapSet.new([])
471+
472+
# Should emit move_out event
473+
assert_receive {:materializer_changes, _, %{move_out: [{10, "10"}]}}
474+
end
475+
476+
@tag snapshot_data: {
477+
[
478+
%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}, move_tags: ["tag_a"]},
479+
%Changes.NewRecord{record: %{"id" => "2", "value" => "20"}, move_tags: ["tag_a"]}
480+
],
481+
[]
482+
}
483+
test "multiple rows with same tag, one updates tag, move_out only affects remaining", ctx do
484+
ctx = with_materializer(ctx)
485+
486+
assert Materializer.get_link_values(ctx) == MapSet.new([10, 20])
487+
488+
# Row 1 moves from tag_a to tag_b, row 2 stays in tag_a
489+
Materializer.new_changes(
490+
ctx,
491+
[
492+
%Changes.UpdatedRecord{
493+
key: ~s("public"."test_table"/"1"),
494+
record: %{"id" => "1", "value" => "10"},
495+
old_record: %{"id" => "1", "value" => "10"},
496+
move_tags: ["tag_b"],
497+
removed_move_tags: ["tag_a"]
498+
}
499+
]
500+
)
501+
502+
refute_received {:materializer_changes, _, _}
503+
504+
# move_out for tag_a should only affect row 2 (row 1 moved to tag_b)
505+
Materializer.new_changes(ctx, [
506+
%{headers: %{event: "move-out", patterns: [%{pos: 0, value: "tag_a"}]}}
507+
])
508+
509+
# Only value 10 should remain (row 1 is now under tag_b)
510+
assert Materializer.get_link_values(ctx) == MapSet.new([10])
511+
512+
# Should emit move_out only for row 2's value
513+
assert_receive {:materializer_changes, _, %{move_out: [{20, "20"}]}}
514+
end
515+
end
516+
373517
describe "move_out events" do
374518
test "runtime move_out event removes rows matching the pattern", ctx do
375519
ctx = with_materializer(ctx)

packages/sync-service/test/electric/shapes/shape_test.exs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,40 @@ defmodule Electric.Shapes.ShapeTest do
210210
assert Shape.convert_change(shape, non_matching_update) == []
211211
end
212212

213+
test "keeps update with tag changes even if filtered columns have not changed" do
214+
shape = %Shape{
215+
root_table: {"public", "table"},
216+
root_table_id: @relation_id,
217+
selected_columns: ["id"],
218+
# tag_structure means this shape tracks tags based on the "parent_id" column
219+
tag_structure: [["parent_id"]]
220+
}
221+
222+
# Update where only parent_id changed (tag change), but id (selected column) didn't
223+
update_with_tag_change = %UpdatedRecord{
224+
relation: {"public", "table"},
225+
old_record: %{"id" => 1, "parent_id" => "old_parent"},
226+
record: %{"id" => 1, "parent_id" => "new_parent"}
227+
}
228+
229+
result =
230+
Shape.convert_change(shape, update_with_tag_change,
231+
stack_id: "test_stack",
232+
shape_handle: "test_handle"
233+
)
234+
235+
# The change should be kept (not filtered out) because it has tag changes
236+
assert length(result) == 1
237+
[converted] = result
238+
239+
# The converted change should have removed_move_tags set (indicating the old tag)
240+
assert converted.removed_move_tags != []
241+
# And move_tags for the new tag
242+
assert converted.move_tags != []
243+
# Tags should be different since parent_id changed
244+
assert converted.move_tags != converted.removed_move_tags
245+
end
246+
213247
test "correctly keeps updates with subqueries if the referenced set has not changed" do
214248
shape = %Shape{
215249
root_table: {"public", "table"},

0 commit comments

Comments
 (0)